You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/08 13:32:30 UTC

incubator-kylin git commit: KYLIN-820

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 bdde514c3 -> 8b60283a4


KYLIN-820


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8b60283a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8b60283a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8b60283a

Branch: refs/heads/0.8.0
Commit: 8b60283a47f5494e4422704c8c6ca3a878eaca1f
Parents: bdde514
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Jun 8 18:24:01 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Jun 8 19:33:13 2015 +0800

----------------------------------------------------------------------
 bin/kylin.sh                                    |   4 +-
 .../kylin/job/streaming/BootstrapConfig.java    |  34 ++++++
 .../kylin/job/streaming/CubeStreamConsumer.java |   2 +-
 .../kylin/job/streaming/StreamingBootstrap.java |  77 ++++++++++----
 .../kylin/job/streaming/StreamingCLI.java       |  36 +++++--
 .../kylin/streaming/OneOffStreamBuilder.java    |  83 +++++++++++++++
 .../apache/kylin/streaming/StreamBuilder.java   | 101 ++----------------
 .../apache/kylin/streaming/StreamFetcher.java   | 104 +++++++++++++++++++
 .../apache/kylin/streaming/StreamingUtil.java   |  82 +++++++++++++++
 .../kylin/streaming/TimePeriodCondition.java    |   8 ++
 10 files changed, 401 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index a179fc8..4bbecfc 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -81,7 +81,7 @@ then
     exit 0
 elif [ $1 == "streaming" ]
 then
-    if [ $# != 4 ]
+    if [ $# -lt 4 ]
     then
         echo 'invalid input args'
         exit -1
@@ -112,7 +112,7 @@ then
         -Dkylin.hive.dependency=${hive_dependency} \
         -Dkylin.hbase.dependency=${hbase_dependency} \
         -Dspring.profiles.active=${spring_profile} \
-        org.apache.kylin.job.streaming.StreamingCLI start $3 $4 > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
+        org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
         echo "streaming started $3 partition $4"
         exit 0
     elif [ $2 == "stop" ]

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
new file mode 100644
index 0000000..5030b66
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.job.streaming;
+
+/**
+ */
+public class BootstrapConfig {
+
+    private boolean oneOff = false;
+    private long start = 0L;
+    private long end = 0L;
+
+    public boolean isOneOff() {
+        return oneOff;
+    }
+
+    public void setOneOff(boolean oneOff) {
+        this.oneOff = oneOff;
+    }
+
+    public long getStart() {
+        return start;
+    }
+
+    public void setStart(long start) {
+        this.start = start;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public void setEnd(long end) {
+        this.end = end;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 03dd92a..98435cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -95,7 +95,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
 
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getFirst(), microStreamBatch.getTimestamp().getSecond(), false, false);
         long start = System.currentTimeMillis();
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
         logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index bcc1b97..e6ce350 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -40,7 +40,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
-import kafka.javaapi.PartitionMetadata;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
@@ -63,6 +62,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -97,15 +97,6 @@ public class StreamingBootstrap {
         return bootstrap;
     }
 
-    private static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
-        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
-        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
-            return partitionMetadata.leader();
-        } else {
-            return null;
-        }
-    }
-
     public void stop() {
         for (KafkaConsumer consumer : kafkaConsumers.values()) {
             consumer.stop();
@@ -123,24 +114,34 @@ public class StreamingBootstrap {
         return result;
     }
 
-    public void start(String streaming, int partitionId) throws Exception {
+    public void start(String streaming, int partitionId, BootstrapConfig bootstrapConfig) throws Exception {
         final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
         Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
 
         if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
             startIIStreaming(streamingConfig, partitionId);
         } else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
-            startCubeStreaming(streamingConfig, partitionId);
+            if (bootstrapConfig.isOneOff()) {
+                Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
+                Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
+                startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+            } else {
+                startCubeStreaming(streamingConfig, partitionId);
+            }
         } else {
             throw new IllegalArgumentException("no cube or ii in kafka config");
         }
     }
 
-    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
+    public void start(String streaming, int partitionId) throws Exception {
+        start(streaming, partitionId, new BootstrapConfig());
+    }
+
+    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionIdOffset) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
         for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
-            final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-            final int transferredPartitionId = partitionId + partitionOffset;
+            final Broker leadBroker = StreamingUtil.getLeadBroker(kafkaClusterConfig, partitionId);
+            final int transferredPartitionId = partitionId + partitionIdOffset;
             final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
             long streamingOffset = latestOffset;
             if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
@@ -168,27 +169,27 @@ public class StreamingBootstrap {
         final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
 
         ArrayList<Integer> allPartitions = Lists.newArrayList();
-        int partitionOffset = 0;
+        int partitionIdOffset = 0;
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
             for (int i = 0; i < partitionCount; i++) {
-                allPartitions.add(i + partitionOffset);
+                allPartitions.add(i + partitionIdOffset);
             }
-            partitionOffset += partitionCount;
+            partitionIdOffset += partitionCount;
         }
         final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
 
         int clusterID = 0;
-        partitionOffset = 0;
+        partitionIdOffset = 0;
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
             Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
 
-            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
+            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionIdOffset);
             logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
             allClustersData.addAll(oneClusterData);
             clusterID++;
-            partitionOffset += partitionCount;
+            partitionIdOffset += partitionCount;
         }
 
         final String cubeName = streamingConfig.getCubeName();
@@ -219,6 +220,38 @@ public class StreamingBootstrap {
         }
     }
 
+    private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp) throws Exception {
+        final String cubeName = streamingConfig.getCubeName();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final StreamParser streamParser = getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+            @Nullable
+            @Override
+            public TblColRef apply(IntermediateColumnDesc input) {
+                return input.getColRef();
+            }
+        }));
+        final int batchInterval = 5 * 60 * 1000;
+        startTimestamp = TimeUtil.getNextPeriodStart(startTimestamp, batchInterval);
+        endTimestamp = TimeUtil.getNextPeriodStart(endTimestamp, batchInterval);
+        final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+
+        int clusterId = 0;
+        for (KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+            HashMap<Integer, Long> partitionIdOffsetMap = Maps.newHashMap();
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            for (int i = 0; i < partitionCount; ++i) {
+                partitionIdOffsetMap.put(i, StreamingUtil.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, i, startTimestamp, streamParser));
+            }
+            final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+            queues.addAll(oneClusterQueue);
+            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+        }
+
+        logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
+        OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp);
+        Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+    }
+
     private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
 
         List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();
@@ -236,7 +269,7 @@ public class StreamingBootstrap {
         Preconditions.checkArgument(ii.getSegments().size() > 0);
         final IISegment iiSegment = ii.getSegments().get(0);
 
-        final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
+        final Broker leadBroker = StreamingUtil.getLeadBroker(kafkaClusterConfig, partitionId);
         Preconditions.checkState(leadBroker != null, "cannot find lead broker");
         final int shard = ii.getDescriptor().getSharding();
         Preconditions.checkArgument(shard % partitionCount == 0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index ea2b2a5..c120015 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.job.streaming;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.cache.RemoteCacheUpdater;
@@ -49,20 +50,33 @@ public class StreamingCLI {
 
     public static void main(String[] args) {
         try {
-            if (args.length != 3) {
-                printArgsError(args);
-                return;
-            }
-
             AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
 
-            if (args[0].equals("start")) {
-                String kafkaConfName = args[1];
-                int partition = Integer.parseInt(args[2]);
-                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition);
-            } else {
-                printArgsError(args);
+            Preconditions.checkArgument(args[0].equals("streaming"));
+            Preconditions.checkArgument(args[1].equals("start"));
+
+            String kafkaConfName = args[2];
+            int partition = Integer.parseInt(args[3]);
+            int i = 4;
+            BootstrapConfig bootstrapConfig = new BootstrapConfig();
+            while (i < args.length) {
+                String argName = args[i];
+                switch (argName) {
+                    case "-oneoff":
+                        bootstrapConfig.setOneOff(Boolean.parseBoolean(args[i + 1]));
+                        break;
+                    case "-start":
+                        bootstrapConfig.setStart(Long.parseLong(args[i + 1]));
+                        break;
+                    case "-end":
+                        bootstrapConfig.setEnd(Long.parseLong(args[i + 1]));
+                        break;
+                    default:
+                        throw new RuntimeException("invalid argName:" + argName);
+                }
+                i += 2;
             }
+            StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition, bootstrapConfig);
         } catch (Exception e) {
             printArgsError(args);
             logger.error("error start streaming", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
new file mode 100644
index 0000000..2c062cc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ */
+public class OneOffStreamBuilder implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(OneOffStreamBuilder.class);
+
+
+    private final String streaming;
+    private final List<BlockingQueue<StreamMessage>> queues;
+    private final MicroStreamBatchConsumer consumer;
+    private final TimePeriodCondition batchCondition;
+    private StreamParser streamParser;
+
+    public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime) {
+        Preconditions.checkArgument(queues.size() > 0);
+        this.batchCondition = new TimePeriodCondition(startTime, endTime);
+        this.streaming = streaming;
+        this.queues = queues;
+        this.consumer = Preconditions.checkNotNull(consumer);
+        this.streamParser = streamParser;
+    }
+
+    @Override
+    public void run() {
+        try {
+            final int inputCount = queues.size();
+            final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
+            final CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+            final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
+            int partitionId = 0;
+            for (BlockingQueue<StreamMessage> queue : queues) {
+                futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+            }
+            countDownLatch.await();
+            List<MicroStreamBatch> batches = Lists.newLinkedList();
+            for (Future<MicroStreamBatch> future : futures) {
+                if (future.get() != null) {
+                    batches.add(future.get());
+                } else {
+                    logger.warn("EOF encountered, stop streaming");
+                }
+            }
+
+            MicroStreamBatch batch = batches.get(0);
+            if (batches.size() > 1) {
+                for (int i = 1; i < inputCount; i++) {
+                    if (batches.get(i).size() > 0) {
+                        batch = MicroStreamBatch.union(batch, batches.get(i));
+                    }
+                }
+            }
+            batch.getTimestamp().setFirst(batchCondition.getStartTime());
+            batch.getTimestamp().setSecond(batchCondition.getEndTime());
+
+            logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+            long startTime = System.currentTimeMillis();
+            consumer.consume(batch);
+            logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+        } catch (InterruptedException ie) {
+            throw new RuntimeException("this thread should not be interrupted", ie);
+        } catch (ExecutionException ee) {
+            logger.error("fetch stream error", ee);
+            throw new RuntimeException(ee);
+        } catch (Exception e) {
+            logger.error("error consume batch", e);
+            throw new RuntimeException("error consume batch", e);
+        }
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 76e58cf..d97d86c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -136,7 +136,7 @@ public class StreamBuilder implements Runnable {
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
                 int partitionId = 0;
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
-                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
+                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start), getStreamParser())));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -152,10 +152,15 @@ public class StreamBuilder implements Runnable {
                 MicroStreamBatch batch = batches.get(0);
                 if (batches.size() > 1) {
                     for (int i = 1; i < inputCount; i++) {
-                        if (batches.get(i).size() > 0)
+                        if (batches.get(i).size() > 0) {
                             batch = MicroStreamBatch.union(batch, batches.get(i));
+                        }
                     }
                 }
+                if (batches.size() > 1) {
+                    batch.getTimestamp().setFirst(start);
+                    batch.getTimestamp().setSecond(start + batchInterval);
+                }
                 if (batchInterval > 0) {
                     start += batchInterval;
                 }
@@ -189,98 +194,6 @@ public class StreamBuilder implements Runnable {
         }
     }
 
-    private class StreamFetcher implements Callable<MicroStreamBatch> {
-
-        private final BlockingQueue<StreamMessage> streamMessageQueue;
-        private final CountDownLatch countDownLatch;
-        private final int partitionId;
-        private final BatchCondition condition;
-
-        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
-            this.partitionId = partitionId;
-            this.streamMessageQueue = streamMessageQueue;
-            this.countDownLatch = countDownLatch;
-            this.condition = condition;
-        }
-
-        private void clearCounter() {
-        }
-
-        private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
-            long t = System.currentTimeMillis();
-            while (true) {
-                final StreamMessage peek = queue.peek();
-                if (peek != null) {
-                    return peek;
-                } else {
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        logger.warn("stream queue should not be interrupted", e);
-                        return null;
-                    }
-                    if (System.currentTimeMillis() - t > timeout) {
-                        break;
-                    }
-                }
-            }
-            return queue.peek();
-        }
-
-        @Override
-        public MicroStreamBatch call() throws Exception {
-            try {
-                MicroStreamBatch microStreamBatch = null;
-                while (true) {
-                    if (microStreamBatch == null) {
-                        microStreamBatch = new MicroStreamBatch(partitionId);
-                        clearCounter();
-                    }
-                    StreamMessage streamMessage = peek(streamMessageQueue, 30000);
-                    if (streamMessage == null) {
-                        logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
-                        if (!microStreamBatch.isEmpty()) {
-                            return microStreamBatch;
-                        } else {
-                            continue;
-                        }
-                    }
-                    if (streamMessage.getOffset() < 0) {
-                        consumer.stop();
-                        logger.warn("streaming encountered EOF, stop building");
-                        return null;
-                    }
-
-                    microStreamBatch.incRawMessageCount();
-                    final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
-                    if (parsedStreamMessage == null) {
-                        throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
-                    }
-
-                    final BatchCondition.Result result = condition.apply(parsedStreamMessage);
-                    if (parsedStreamMessage.isAccepted()) {
-                        if (result == BatchCondition.Result.ACCEPT) {
-                            streamMessageQueue.take();
-                            microStreamBatch.add(parsedStreamMessage);
-                        } else if (result == BatchCondition.Result.DISCARD) {
-                            streamMessageQueue.take();
-                        } else if (result == BatchCondition.Result.REJECT) {
-                            return microStreamBatch;
-                        }
-                    } else {
-                        streamMessageQueue.take();
-                    }
-                }
-            } catch (Exception e) {
-                logger.error("build stream error, stop building", e);
-                throw new RuntimeException("build stream error, stop building", e);
-            } finally {
-                logger.info("one partition sign off");
-                countDownLatch.countDown();
-            }
-        }
-    }
-
     public final StreamParser getStreamParser() {
         return streamParser;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
new file mode 100644
index 0000000..4f29610
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -0,0 +1,104 @@
+package org.apache.kylin.streaming;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ */
+public class StreamFetcher implements Callable<MicroStreamBatch> {
+    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+    private final BlockingQueue<StreamMessage> streamMessageQueue;
+    private final CountDownLatch countDownLatch;
+    private final int partitionId;
+    private final BatchCondition condition;
+    private final StreamParser streamParser;
+
+    public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition, StreamParser streamParser) {
+        this.partitionId = partitionId;
+        this.streamMessageQueue = streamMessageQueue;
+        this.countDownLatch = countDownLatch;
+        this.condition = condition;
+        this.streamParser = streamParser;
+    }
+
+    private void clearCounter() {
+    }
+
+    private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
+        long t = System.currentTimeMillis();
+        while (true) {
+            final StreamMessage peek = queue.peek();
+            if (peek != null) {
+                return peek;
+            } else {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    logger.warn("stream queue should not be interrupted", e);
+                    return null;
+                }
+                if (System.currentTimeMillis() - t > timeout) {
+                    break;
+                }
+            }
+        }
+        return queue.peek();
+    }
+
+    @Override
+    public MicroStreamBatch call() throws Exception {
+        try {
+            MicroStreamBatch microStreamBatch = null;
+            while (true) {
+                if (microStreamBatch == null) {
+                    microStreamBatch = new MicroStreamBatch(partitionId);
+                    clearCounter();
+                }
+                StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+                if (streamMessage == null) {
+                    logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
+                    if (!microStreamBatch.isEmpty()) {
+                        return microStreamBatch;
+                    } else {
+                        continue;
+                    }
+                }
+                if (streamMessage.getOffset() < 0) {
+                    logger.warn("streaming encountered EOF, stop building");
+                    return null;
+                }
+
+                microStreamBatch.incRawMessageCount();
+                final ParsedStreamMessage parsedStreamMessage = streamParser.parse(streamMessage);
+                if (parsedStreamMessage == null) {
+                    throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
+                }
+
+                final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+                if (parsedStreamMessage.isAccepted()) {
+                    if (result == BatchCondition.Result.ACCEPT) {
+                        streamMessageQueue.take();
+                        microStreamBatch.add(parsedStreamMessage);
+                    } else if (result == BatchCondition.Result.DISCARD) {
+                        streamMessageQueue.take();
+                    } else if (result == BatchCondition.Result.REJECT) {
+                        return microStreamBatch;
+                    }
+                } else {
+                    streamMessageQueue.take();
+                }
+            }
+        } catch (Exception e) {
+            logger.error("build stream error, stop building", e);
+            throw new RuntimeException("build stream error, stop building", e);
+        } finally {
+            logger.info("one partition sign off");
+            countDownLatch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
new file mode 100644
index 0000000..2a693a6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -0,0 +1,82 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public final class StreamingUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingUtil.class);
+
+    private StreamingUtil(){}
+
+    public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
+        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
+        final String topic = kafkaClusterConfig.getTopic();
+        final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+        final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
+        Preconditions.checkArgument(response.errorCode(topic, partitionId) == 0, "errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
+        final MessageAndOffset messageAndOffset = response.messageSet(topic, partitionId).iterator().next();
+        final ByteBuffer payload = messageAndOffset.message().payload();
+        byte[] bytes = new byte[payload.limit()];
+        payload.get(bytes);
+        final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
+        return parsedStreamMessage.getTimestamp();
+    }
+
+    public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
+        final String topic = kafkaClusterConfig.getTopic();
+        final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+        final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+        final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
+        logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
+        return binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
+    }
+
+    private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
+        while (startOffset <= endOffset + 2) {
+            long midOffset = startOffset + ((endOffset - startOffset) >> 1);
+            long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
+            long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
+            long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
+            // hard to ensure these 2 conditions
+//            Preconditions.checkArgument(startTimestamp <= midTimestamp);
+//            Preconditions.checkArgument(midTimestamp <= endTimestamp);
+            if (startTimestamp > targetTimestamp) {
+                return startOffset;
+            }
+            if (endTimestamp < targetTimestamp) {
+                return endOffset;
+            }
+            if (targetTimestamp == midTimestamp) {
+                return midOffset;
+            } else if (targetTimestamp < midTimestamp) {
+                endOffset = midOffset;
+                continue;
+            } else {
+                startOffset = midOffset;
+                continue;
+            }
+        }
+        return startOffset;
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
index 7752437..d3349d5 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -10,8 +10,16 @@ public class TimePeriodCondition implements BatchCondition {
     public TimePeriodCondition(long startTime, long endTime) {
         this.startTime = startTime;
         this.endTime = endTime;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
 
+    public long getEndTime() {
+        return endTime;
     }
+
     @Override
     public Result apply(ParsedStreamMessage message) {
         if (message.getTimestamp() < startTime) {