You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/10 09:35:27 UTC

incubator-kylin git commit: KYLIN-820 minor change of kylin.sh to support crontab run OneOffStreamBuilder

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 a52dfb67a -> 3a31d916a


KYLIN-820 minor change of kylin.sh to support crontab run OneOffStreamBuilder


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3a31d916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3a31d916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3a31d916

Branch: refs/heads/0.8.0
Commit: 3a31d916abb0ea7a55a37aa5b278cc1472b655e0
Parents: a52dfb6
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 9 20:07:17 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jun 10 15:33:17 2015 +0800

----------------------------------------------------------------------
 bin/kylin.sh                                    |  4 +--
 .../kylin/job/streaming/BootstrapConfig.java    | 28 +++++++++++++++
 .../kylin/job/streaming/StreamingBootstrap.java | 37 +++++++++-----------
 .../kylin/job/streaming/StreamingCLI.java       | 25 ++++++++-----
 .../apache/kylin/streaming/BatchCondition.java  |  2 +-
 .../kylin/streaming/OneOffStreamBuilder.java    |  4 +--
 .../kylin/streaming/TimePeriodCondition.java    | 13 +++++--
 7 files changed, 76 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index 4bbecfc..97c29c5 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -83,7 +83,7 @@ elif [ $1 == "streaming" ]
 then
     if [ $# -lt 4 ]
     then
-        echo 'invalid input args'
+        echo "invalid input args $@"
         exit -1
     fi
     if [ $2 == "start" ]
@@ -113,7 +113,7 @@ then
         -Dkylin.hbase.dependency=${hbase_dependency} \
         -Dspring.profiles.active=${spring_profile} \
         org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
-        echo "streaming started $3 partition $4"
+        echo "streaming started name: $3 id: $4"
         exit 0
     elif [ $2 == "stop" ]
     then

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index 5030b66..a82fec3 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -4,9 +4,21 @@ package org.apache.kylin.job.streaming;
  */
 public class BootstrapConfig {
 
+    private String streaming;
+    private int partitionId = -1;
+
     private boolean oneOff = false;
     private long start = 0L;
     private long end = 0L;
+    private long margin = 0L;
+
+    public long getMargin() {
+        return margin;
+    }
+
+    public void setMargin(long margin) {
+        this.margin = margin;
+    }
 
     public boolean isOneOff() {
         return oneOff;
@@ -31,4 +43,20 @@ public class BootstrapConfig {
     public void setEnd(long end) {
         this.end = end;
     }
+
+    public String getStreaming() {
+        return streaming;
+    }
+
+    public void setStreaming(String streaming) {
+        this.streaming = streaming;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 809f454..5bdc670 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -111,19 +111,23 @@ public class StreamingBootstrap {
         return result;
     }
 
-    public void start(String streaming, int partitionId, BootstrapConfig bootstrapConfig) throws Exception {
+    public void start(BootstrapConfig bootstrapConfig) throws Exception {
+        final String streaming = bootstrapConfig.getStreaming();
+        Preconditions.checkNotNull(streaming, "streaming name cannot be empty");
         final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
         Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
 
         if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
+            int partitionId = bootstrapConfig.getPartitionId();
+            Preconditions.checkArgument(partitionId >= 0, "partitionId cannot be empty for inverted index streaming");
             startIIStreaming(streamingConfig, partitionId);
         } else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
             if (bootstrapConfig.isOneOff()) {
                 Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
                 Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
-                startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+                startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd(), bootstrapConfig.getMargin());
             } else {
-                startCubeStreaming(streamingConfig, partitionId);
+                startCubeStreaming(streamingConfig);
             }
         } else {
             throw new IllegalArgumentException("no cube or ii in kafka config");
@@ -131,7 +135,10 @@ public class StreamingBootstrap {
     }
 
     public void start(String streaming, int partitionId) throws Exception {
-        start(streaming, partitionId, new BootstrapConfig());
+        final BootstrapConfig bootstrapConfig = new BootstrapConfig();
+        bootstrapConfig.setPartitionId(partitionId);
+        bootstrapConfig.setStreaming(streaming);
+        start(bootstrapConfig);
     }
 
     private List<BlockingQueue<StreamMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionIdOffset) {
@@ -154,21 +161,13 @@ public class StreamingBootstrap {
             }
             logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
             KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-            final String threadName = String.format("%s_%d_%d", kafkaClusterConfig.getTopic(), clusterID, partitionId);
-            Executors.newSingleThreadExecutor(new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    final Thread thread = new Thread(r, threadName);
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            }).submit(consumer);
+            Executors.newSingleThreadExecutor().submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
         return result;
     }
 
-    private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+    private void startCubeStreaming(StreamingConfig streamingConfig) throws Exception {
         List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
 
         final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
@@ -188,7 +187,6 @@ public class StreamingBootstrap {
         partitionIdOffset = 0;
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
-            Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
 
             final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionIdOffset);
             logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
@@ -225,7 +223,7 @@ public class StreamingBootstrap {
         }
     }
 
-    private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp) throws Exception {
+    private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp, long margin) throws Exception {
         final String cubeName = streamingConfig.getCubeName();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final StreamParser streamParser = getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@@ -235,9 +233,6 @@ public class StreamingBootstrap {
                 return input.getColRef();
             }
         }));
-        final int batchInterval = 5 * 60 * 1000;
-        startTimestamp = TimeUtil.getNextPeriodStart(startTimestamp, batchInterval);
-        endTimestamp = TimeUtil.getNextPeriodStart(endTimestamp, batchInterval);
         final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
 
         int clusterId = 0;
@@ -248,7 +243,7 @@ public class StreamingBootstrap {
             final CountDownLatch countDownLatch = new CountDownLatch(partitionCount);
             for (int i = 0; i < partitionCount; ++i) {
                 final int idx = i;
-                final long start = startTimestamp;
+                final long start = startTimestamp - margin;
                 executorService.submit(new Runnable() {
                     @Override
                     public void run() {
@@ -270,7 +265,7 @@ public class StreamingBootstrap {
         }
 
         logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
-        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp);
+        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
         Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
         logger.info("one off build finished");
         System.exit(0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index c120015..9b93852 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -55,28 +55,35 @@ public class StreamingCLI {
             Preconditions.checkArgument(args[0].equals("streaming"));
             Preconditions.checkArgument(args[1].equals("start"));
 
-            String kafkaConfName = args[2];
-            int partition = Integer.parseInt(args[3]);
-            int i = 4;
+            int i = 2;
             BootstrapConfig bootstrapConfig = new BootstrapConfig();
             while (i < args.length) {
                 String argName = args[i];
                 switch (argName) {
                     case "-oneoff":
-                        bootstrapConfig.setOneOff(Boolean.parseBoolean(args[i + 1]));
+                        bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
                         break;
                     case "-start":
-                        bootstrapConfig.setStart(Long.parseLong(args[i + 1]));
+                        bootstrapConfig.setStart(Long.parseLong(args[++i]));
                         break;
                     case "-end":
-                        bootstrapConfig.setEnd(Long.parseLong(args[i + 1]));
+                        bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+                        break;
+                    case "-streaming":
+                        bootstrapConfig.setStreaming(args[++i]);
+                        break;
+                    case "-partition":
+                        bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                        break;
+                    case "-margin":
+                        bootstrapConfig.setMargin(Long.parseLong(args[++i]));
                         break;
                     default:
-                        throw new RuntimeException("invalid argName:" + argName);
+                        logger.warn("ignore this arg:" + argName);
                 }
-                i += 2;
+                i++;
             }
-            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition, bootstrapConfig);
+            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
         } catch (Exception e) {
             printArgsError(args);
             logger.error("error start streaming", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
index 54f966c..2748911 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -5,7 +5,7 @@ package org.apache.kylin.streaming;
 public interface BatchCondition {
 
     enum Result {
-        ACCEPT, REJECT, DISCARD,LAST_ACCEPT_FOR_BATCH
+        ACCEPT, REJECT, DISCARD, LAST_ACCEPT_FOR_BATCH
     }
 
     Result apply(ParsedStreamMessage message);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
index 2c062cc..f076988 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -22,9 +22,9 @@ public class OneOffStreamBuilder implements Runnable {
     private final TimePeriodCondition batchCondition;
     private StreamParser streamParser;
 
-    public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime) {
+    public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime, long margin) {
         Preconditions.checkArgument(queues.size() > 0);
-        this.batchCondition = new TimePeriodCondition(startTime, endTime);
+        this.batchCondition = new TimePeriodCondition(startTime, endTime, margin);
         this.streaming = streaming;
         this.queues = queues;
         this.consumer = Preconditions.checkNotNull(consumer);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
index d3349d5..560504c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -6,10 +6,16 @@ public class TimePeriodCondition implements BatchCondition {
 
     private final long startTime;
     private final long endTime;
+    private final long margin;
 
     public TimePeriodCondition(long startTime, long endTime) {
+        this(startTime, endTime, 0L);
+    }
+
+    public TimePeriodCondition(long startTime, long endTime, long margin) {
         this.startTime = startTime;
         this.endTime = endTime;
+        this.margin = margin;
     }
 
     public long getStartTime() {
@@ -22,10 +28,13 @@ public class TimePeriodCondition implements BatchCondition {
 
     @Override
     public Result apply(ParsedStreamMessage message) {
-        if (message.getTimestamp() < startTime) {
+        final long timestamp = message.getTimestamp();
+        if (timestamp < startTime) {
             return Result.DISCARD;
-        } else if (message.getTimestamp() < endTime) {
+        } else if (timestamp < endTime) {
             return Result.ACCEPT;
+        } else if (timestamp < endTime + margin) {
+            return Result.DISCARD;
         } else {
             return Result.REJECT;
         }