You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:49 UTC

[1/3] incubator-kylin git commit: add support for commit offset for multi partitions

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 dda27de23 -> 4f7b1bffa


add support for commit offset for multi partitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6133d735
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6133d735
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6133d735

Branch: refs/heads/0.8.0
Commit: 6133d7356426cf2229ac0d1122227ddcf2c7a3bc
Parents: dda27de
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 18:14:56 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:56:23 2015 +0800

----------------------------------------------------------------------
 .../kylin/streaming/StreamingManager.java       | 48 ++++++++++++++++++++
 .../kylin/streaming/StreamingManagerTest.java   | 37 +++++++++++++++
 2 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6133d735/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
index f025216..3552edc 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -34,13 +34,23 @@
 
 package org.apache.kylin.streaming;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -95,6 +105,10 @@ public class StreamingManager {
         return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
     }
 
+    private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+    }
+
 
     public boolean createOrUpdateKafkaConfig(String name, StreamingConfig config) {
         try {
@@ -142,4 +156,38 @@ public class StreamingManager {
         }
     }
 
+    public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
+        Collections.sort(partitions);
+        final String resPath = formatStreamingOutputPath(streaming, partitions);
+        try {
+            final InputStream inputStream = getStore().getResource(resPath);
+            if (inputStream == null) {
+                return Collections.emptyMap();
+            }
+            final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
+            return result;
+        } catch (IOException e) {
+            logger.error("error get offset, path:" + resPath, e);
+            throw new RuntimeException("error get offset, path:" + resPath, e);
+        }
+    }
+
+    public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
+        List<Integer> partitions = Lists.newLinkedList(offset.keySet());
+        Collections.sort(partitions);
+        final String resPath = formatStreamingOutputPath(streaming, partitions);
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            mapper.writeValue(baos, offset);
+            getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
+        } catch (IOException e) {
+            logger.error("error update offset, path:" + resPath, e);
+            throw new RuntimeException("error update offset, path:" + resPath, e);
+        }
+    }
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6133d735/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
index 772643b..7a53060 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
@@ -34,12 +34,20 @@
 
 package org.apache.kylin.streaming;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.TestCase;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.fail;
@@ -82,11 +90,40 @@ public class StreamingManagerTest extends LocalFileMetadataTestCase {
         updateOffsetAndCompare(streaming, partition, 1000);
         updateOffsetAndCompare(streaming, partition, 800);
         updateOffsetAndCompare(streaming, partition, 2000);
+    }
+
+    @Test
+    public void testMultiOffset() {
+        final String streaming = "kafka_multi_test";
+        List<Integer> partitions = Lists.newArrayList(Lists.asList(0, 1, new Integer[]{2, 3}));
+        assertEquals(0, streamingManager.getOffset("kafka_multi_test", partitions).size());
 
+        for (int i = 0; i < 10; i++) {
+            updateOffsetAndCompare(streaming, generateRandomOffset(partitions));
+        }
+    }
+
+    private HashMap<Integer, Long> generateRandomOffset(List<Integer> partitions) {
+        final HashMap<Integer, Long> result = Maps.newHashMap();
+        final Random random = new Random();
+        for (Integer partition : partitions) {
+            result.put(partition, random.nextLong());
+        }
+        return result;
     }
 
     private void updateOffsetAndCompare(String streaming, int partition, long offset) {
         streamingManager.updateOffset(streaming, partition, offset);
         assertEquals(offset, streamingManager.getOffset(streaming, partition));
     }
+
+    private void updateOffsetAndCompare(String streaming, HashMap<Integer, Long> offset) {
+        streamingManager.updateOffset(streaming, offset);
+        final Map<Integer, Long> result = streamingManager.getOffset(streaming, Lists.newLinkedList(offset.keySet()));
+        System.out.println(result);
+        assertEquals(offset.size(), result.size());
+        for (Integer partition : result.keySet()) {
+            assertEquals(offset.get(partition), result.get(partition));
+        }
+    }
 }


[2/3] incubator-kylin git commit: refactor

Posted by qh...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/11e9d3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/11e9d3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/11e9d3e9

Branch: refs/heads/0.8.0
Commit: 11e9d3e92d3e886006ee526a2b3581bcf00f2bfe
Parents: 6133d73
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 20:29:30 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 11:57:01 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 38 +++++++++++++++-----
 .../apache/kylin/job/BuildIIWithStreamTest.java |  3 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  2 +-
 .../job/streaming/CubeStreamConsumerTest.java   |  2 +-
 .../kylin/streaming/MicroStreamBatch.java       | 10 +++++-
 .../apache/kylin/streaming/StreamBuilder.java   | 31 +++++++++++++---
 6 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index fd590d2..436c55b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.lang.reflect.Constructor;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -135,16 +136,21 @@ public class StreamingBootstrap {
         }
     }
 
-    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount) {
+    private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
         for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
             final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
-
+            final int transferredPartitionId = partitionId + partitionOffset;
             final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
             long streamingOffset = latestOffset;
-            logger.info("submitting offset:" + streamingOffset);
-
-            KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig, 1);
+            if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
+                final long earliestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+                long committedOffset = partitionIdOffsetMap.get(transferredPartitionId);
+                Preconditions.checkArgument(committedOffset >= earliestOffset && committedOffset <= latestOffset, String.format("invalid offset:%d, earliestOffset:%d, latestOffset:%d", committedOffset, earliestOffset, latestOffset));
+                streamingOffset = committedOffset;
+            }
+            logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
+            KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
             Executors.newSingleThreadExecutor().submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
@@ -156,15 +162,28 @@ public class StreamingBootstrap {
 
         final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
 
+        ArrayList<Integer> allPartitions = Lists.newArrayList();
+        int partitionOffset = 0;
+        for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            for (int i = 0; i < partitionCount; i++) {
+                allPartitions.add(i + partitionOffset);
+            }
+            partitionOffset += partitionCount;
+        }
+        final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
+
         int clusterID = 0;
+        partitionOffset = 0;
         for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
             final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
             Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
 
-            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount);
+            final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
             logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
             allClustersData.addAll(oneClusterData);
             clusterID++;
+            partitionOffset += partitionCount;
         }
 
         final String cubeName = streamingConfig.getCubeName();
@@ -174,7 +193,7 @@ public class StreamingBootstrap {
         MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
         long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
         logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
             @Nullable
             @Override
@@ -242,7 +261,10 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism), new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE), new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+            final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+                    consumer.getStreamQueue(i % parallelism),
+                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index e84b176..6d660e5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -214,7 +214,8 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = new StreamBuilder(queue,
+        final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+                queue,
                 new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
                 new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
                 0);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index b2fb5ea..9a28ea4 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -85,7 +85,7 @@ public class IITest extends LocalFileMetadataTestCase {
         List<List<String>> parsedStreamMessages = Lists.newArrayList();
         StreamParser parser = StringStreamParser.instance;
 
-        MicroStreamBatch batch = new MicroStreamBatch();
+        MicroStreamBatch batch = new MicroStreamBatch(0);
         for (StreamMessage message : streamMessages) {
             ParsedStreamMessage parsedStreamMessage = parser.parse(message);
             if ((parsedStreamMessage.isAccepted())) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index 328ec72..ca0d037 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -64,7 +64,7 @@ public class CubeStreamConsumerTest {
     @Test
     public void test() throws Exception {
         LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         loadDataFromLocalFile(queue, 100000);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index e1ff60c..6ecd592 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -9,6 +9,8 @@ import java.util.List;
  */
 public final class MicroStreamBatch {
 
+    private final int partitionId;
+
     private final List<List<String>> streams;
 
     private final Pair<Long, Long> timestamp;
@@ -17,18 +19,24 @@ public final class MicroStreamBatch {
 
     private int rawMessageCount;
 
-    public MicroStreamBatch() {
+    public MicroStreamBatch(int partitionId) {
+        this.partitionId = partitionId;
         this.streams = Lists.newLinkedList();
         this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
         this.offset = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
     }
 
     private MicroStreamBatch(MicroStreamBatch batch) {
+        this.partitionId = batch.partitionId;
         this.streams = Lists.newLinkedList(batch.streams);
         this.timestamp = Pair.newPair(batch.timestamp.getFirst(), batch.timestamp.getSecond());
         this.offset = Pair.newPair(batch.offset.getFirst(), batch.offset.getSecond());
     }
 
+    public int getPartitionId() {
+        return partitionId;
+    }
+
     public final List<List<String>> getStreams() {
         return streams;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11e9d3e9/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 8e5b0ca..cb8dfb1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -36,11 +36,14 @@ package org.apache.kylin.streaming;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.*;
 
@@ -59,8 +62,11 @@ public class StreamBuilder implements Runnable {
 
     private final MicroBatchCondition condition;
 
-    public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private final String streaming;
+
+    public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
         Preconditions.checkArgument(inputs.size() > 0);
+        this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
@@ -68,7 +74,8 @@ public class StreamBuilder implements Runnable {
         init(inputs);
     }
 
-    public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+        this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
@@ -90,11 +97,16 @@ public class StreamBuilder implements Runnable {
             final int inputCount = streamMessageQueues.size();
             final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
             long start = startTimestamp;
+            List<Integer> partitions = Lists.newArrayList();
+            for (int i = 0, partitionCount = streamMessageQueues.size(); i < partitionCount; i++) {
+                partitions.add(i);
+            }
             while (true) {
                 CountDownLatch countDownLatch = new CountDownLatch(inputCount);
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
+                int partitionId = 0;
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
-                    futures.add(executorService.submit(new StreamFetcher(streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -126,6 +138,13 @@ public class StreamBuilder implements Runnable {
                     long startTime = System.currentTimeMillis();
                     consumer.consume(batch);
                     logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+                    if (batches.size() > 1) {
+                        final HashMap<Integer, Long> offset = Maps.newHashMap();
+                        for (MicroStreamBatch microStreamBatch : batches) {
+                            offset.put(microStreamBatch.getPartitionId(), microStreamBatch.getOffset().getSecond());
+                        }
+                        StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).updateOffset(streaming, offset);
+                    }
                 }
             }
         } catch (InterruptedException e) {
@@ -143,10 +162,12 @@ public class StreamBuilder implements Runnable {
 
         private final BlockingQueue<StreamMessage> streamMessageQueue;
         private final CountDownLatch countDownLatch;
+        private final int partitionId;
         private long startTimestamp;
         private long endTimestamp;
 
-        public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+            this.partitionId = partitionId;
             this.streamMessageQueue = streamMessageQueue;
             this.countDownLatch = countDownLatch;
             this.startTimestamp = startTimestamp;
@@ -183,7 +204,7 @@ public class StreamBuilder implements Runnable {
                 MicroStreamBatch microStreamBatch = null;
                 while (true) {
                     if (microStreamBatch == null) {
-                        microStreamBatch = new MicroStreamBatch();
+                        microStreamBatch = new MicroStreamBatch(partitionId);
                         clearCounter();
                     }
                     StreamMessage streamMessage = peek(streamMessageQueue, 30000);


[3/3] incubator-kylin git commit: refactor

Posted by qh...@apache.org.
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4f7b1bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4f7b1bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4f7b1bff

Branch: refs/heads/0.8.0
Commit: 4f7b1bffa31773e434258db7532d6eb0944b94fe
Parents: 11e9d3e
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 5 12:32:15 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 12:32:15 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamConsumer.java |  2 +
 .../kylin/job/streaming/StreamingBootstrap.java | 10 +--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  7 +-
 .../job/streaming/CubeStreamConsumerTest.java   |  6 +-
 .../apache/kylin/streaming/BatchCondition.java  | 16 ++++
 .../kylin/streaming/LimitedSizeCondition.java   | 29 +++++++
 .../kylin/streaming/MicroBatchCondition.java    | 22 ------
 .../apache/kylin/streaming/StreamBuilder.java   | 82 +++++++++++++-------
 .../kylin/streaming/TimePeriodCondition.java    | 30 +++++++
 9 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index bd7c6cb..03dd92a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -96,7 +96,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+        long start = System.currentTimeMillis();
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+        logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();
         final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 436c55b..e28fa9d 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -190,10 +190,9 @@ public class StreamingBootstrap {
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
         int batchInterval = 5 * 60 * 1000;
-        MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
         long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
         logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(streamingConfig.getName(), allClustersData, new CubeStreamConsumer(cubeName), startTimestamp, batchInterval);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
             @Nullable
             @Override
@@ -261,10 +260,11 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+            final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
                     consumer.getStreamQueue(i % parallelism),
-                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
-                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
+                    0L,
+                    iiDesc.getSliceSize());
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 6d660e5..9693771 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,7 +58,6 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.MicroBatchCondition;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -214,11 +213,11 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName,
                 queue,
-                new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
                 new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
-                0);
+                0,
+                segment.getIIDesc().getSliceSize());
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index ca0d037..8e3eb61 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.job.streaming;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -10,7 +11,6 @@ import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.MicroBatchCondition;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.junit.Before;
@@ -64,7 +64,9 @@ public class CubeStreamConsumerTest {
     @Test
     public void test() throws Exception {
         LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        queues.add(queue);
+        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         loadDataFromLocalFile(queue, 100000);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
new file mode 100644
index 0000000..0fa11c1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -0,0 +1,16 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface BatchCondition {
+
+    enum Result {
+        ACCEPT,
+        REJECT,
+        DISCARD
+    }
+
+    Result apply(ParsedStreamMessage message);
+
+    BatchCondition copy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
new file mode 100644
index 0000000..3c1e367
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class LimitedSizeCondition implements BatchCondition {
+
+    private final int limit;
+    private int count;
+
+    public LimitedSizeCondition(int limit) {
+        this.limit = limit;
+        this.count = 0;
+    }
+
+    @Override
+    public Result apply(ParsedStreamMessage message) {
+        if (count < limit) {
+            count++;
+            return Result.ACCEPT;
+        } else {
+            return Result.REJECT;
+        }
+    }
+
+    @Override
+    public BatchCondition copy() {
+        return new LimitedSizeCondition(this.limit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
deleted file mode 100644
index baf7b04..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.kylin.streaming;
-
-/**
- */
-public final class MicroBatchCondition {
-
-    private final int batchSize;
-    private final int batchInterval;
-
-    public MicroBatchCondition(int batchSize, int batchInterval) {
-        this.batchSize = batchSize;
-        this.batchInterval = batchInterval;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public int getBatchInterval() {
-        return batchInterval;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb8dfb1..3da4d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -52,7 +52,6 @@ import java.util.concurrent.*;
 public class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
-    private final long startTimestamp;
 
     private StreamParser streamParser = StringStreamParser.instance;
 
@@ -60,26 +59,49 @@ public class StreamBuilder implements Runnable {
 
     private final MicroStreamBatchConsumer consumer;
 
-    private final MicroBatchCondition condition;
-
     private final String streaming;
 
-    public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private final long startTimestamp;
+
+    private final long batchInterval;
+
+    private final int batchSize;
+
+    public static final StreamBuilder newPeriodicalStreamBuilder(String streaming,
+                                                                 List<BlockingQueue<StreamMessage>> inputs,
+                                                                 MicroStreamBatchConsumer consumer,
+                                                                 long startTimestamp,
+                                                                 long batchInterval) {
+        return new StreamBuilder(streaming, inputs, consumer, startTimestamp, batchInterval);
+    }
+
+    public static final StreamBuilder newLimitedSizeStreamBuilder(String streaming,
+                                                                 BlockingQueue<StreamMessage> input,
+                                                                 MicroStreamBatchConsumer consumer,
+                                                                 long startTimestamp,
+                                                                 int batchSize) {
+        return new StreamBuilder(streaming, input, consumer, startTimestamp, batchSize);
+    }
+
+
+    private StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroStreamBatchConsumer consumer, long startTimestamp, long batchInterval) {
         Preconditions.checkArgument(inputs.size() > 0);
         this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
-        this.condition = condition;
         this.startTimestamp = startTimestamp;
+        this.batchInterval = batchInterval;
+        this.batchSize = -1;
         init(inputs);
     }
 
-    public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroStreamBatchConsumer consumer, long startTimestamp, int batchSize) {
         this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
-        this.condition = condition;
         this.startTimestamp = startTimestamp;
+        this.batchInterval = -1L;
+        this.batchSize = batchSize;
         init(Preconditions.checkNotNull(input));
     }
 
@@ -91,6 +113,14 @@ public class StreamBuilder implements Runnable {
         this.streamMessageQueues.addAll(inputs);
     }
 
+    private BatchCondition generateBatchCondition(long startTimestamp) {
+        if (batchInterval > 0) {
+            return new TimePeriodCondition(startTimestamp, startTimestamp + batchInterval);
+        } else {
+            return new LimitedSizeCondition(batchSize);
+        }
+    }
+
     @Override
     public void run() {
         try {
@@ -106,7 +136,7 @@ public class StreamBuilder implements Runnable {
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
                 int partitionId = 0;
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
-                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -125,9 +155,9 @@ public class StreamBuilder implements Runnable {
                         batch = MicroStreamBatch.union(batch, batches.get(i));
                     }
                 }
-                batch.getTimestamp().setFirst(start);
-                batch.getTimestamp().setSecond(start + condition.getBatchInterval());
-                start += condition.getBatchInterval();
+                if (batchInterval > 0) {
+                    start += batchInterval;
+                }
 
                 if (batch.size() == 0) {
                     logger.info("nothing to build, skip to next iteration after sleeping 10s");
@@ -163,15 +193,13 @@ public class StreamBuilder implements Runnable {
         private final BlockingQueue<StreamMessage> streamMessageQueue;
         private final CountDownLatch countDownLatch;
         private final int partitionId;
-        private long startTimestamp;
-        private long endTimestamp;
+        private final BatchCondition condition;
 
-        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
             this.partitionId = partitionId;
             this.streamMessageQueue = streamMessageQueue;
             this.countDownLatch = countDownLatch;
-            this.startTimestamp = startTimestamp;
-            this.endTimestamp = endTimestamp;
+            this.condition = condition;
         }
 
         private void clearCounter() {
@@ -228,22 +256,18 @@ public class StreamBuilder implements Runnable {
                         throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
                     }
 
-                    final long timestamp = parsedStreamMessage.getTimestamp();
-                    if (timestamp < startTimestamp) {
-                        //TODO properly handle late megs
-                        streamMessageQueue.take();
-                    } else if (timestamp < endTimestamp) {
-                        streamMessageQueue.take();
-                        if (parsedStreamMessage.isAccepted()) {
+                    final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+                    if (parsedStreamMessage.isAccepted()) {
+                        if (result == BatchCondition.Result.ACCEPT) {
+                            streamMessageQueue.take();
                             microStreamBatch.add(parsedStreamMessage);
-                            if (microStreamBatch.size() >= condition.getBatchSize()) {
-                                return microStreamBatch;
-                            }
-                        } else {
-                            //ignore pruned stream message
+                        } else if (result == BatchCondition.Result.DISCARD) {
+                            streamMessageQueue.take();
+                        } else if (result == BatchCondition.Result.REJECT) {
+                            return microStreamBatch;
                         }
                     } else {
-                        return microStreamBatch;
+                        streamMessageQueue.take();
                     }
                 }
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
new file mode 100644
index 0000000..fdd35fc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class TimePeriodCondition implements BatchCondition {
+
+    private final long startTime;
+    private final long endTime;
+
+    public TimePeriodCondition(long startTime, long endTime) {
+        this.startTime = startTime;
+        this.endTime = endTime;
+
+    }
+    @Override
+    public Result apply(ParsedStreamMessage message) {
+        if (message.getTimestamp() < startTime) {
+            return Result.DISCARD;
+        } else if (message.getTimestamp() < endTime) {
+            return Result.ACCEPT;
+        } else {
+            return Result.REJECT;
+        }
+    }
+
+    @Override
+    public BatchCondition copy() {
+        return this;
+    }
+}