You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:51 UTC

[3/3] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4f7b1bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4f7b1bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4f7b1bff

Branch: refs/heads/0.8.0
Commit: 4f7b1bffa31773e434258db7532d6eb0944b94fe
Parents: 11e9d3e
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 5 12:32:15 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 12:32:15 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamConsumer.java |  2 +
 .../kylin/job/streaming/StreamingBootstrap.java | 10 +--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  7 +-
 .../job/streaming/CubeStreamConsumerTest.java   |  6 +-
 .../apache/kylin/streaming/BatchCondition.java  | 16 ++++
 .../kylin/streaming/LimitedSizeCondition.java   | 29 +++++++
 .../kylin/streaming/MicroBatchCondition.java    | 22 ------
 .../apache/kylin/streaming/StreamBuilder.java   | 82 +++++++++++++-------
 .../kylin/streaming/TimePeriodCondition.java    | 30 +++++++
 9 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index bd7c6cb..03dd92a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -96,7 +96,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+        long start = System.currentTimeMillis();
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+        logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();
         final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 436c55b..e28fa9d 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -190,10 +190,9 @@ public class StreamingBootstrap {
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
         int batchInterval = 5 * 60 * 1000;
-        MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
         long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
         logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(streamingConfig.getName(), allClustersData, new CubeStreamConsumer(cubeName), startTimestamp, batchInterval);
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
             @Nullable
             @Override
@@ -261,10 +260,11 @@ public class StreamingBootstrap {
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {
-            final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+            final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
                     consumer.getStreamQueue(i % parallelism),
-                    new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
-                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
+                    0L,
+                    iiDesc.getSliceSize());
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 6d660e5..9693771 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,7 +58,6 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.MicroBatchCondition;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -214,11 +213,11 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+        final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName,
                 queue,
-                new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
                 new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
-                0);
+                0,
+                segment.getIIDesc().getSliceSize());
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index ca0d037..8e3eb61 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.job.streaming;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -10,7 +11,6 @@ import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.MicroBatchCondition;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.junit.Before;
@@ -64,7 +64,9 @@ public class CubeStreamConsumerTest {
     @Test
     public void test() throws Exception {
         LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+        List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+        queues.add(queue);
+        StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         loadDataFromLocalFile(queue, 100000);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
new file mode 100644
index 0000000..0fa11c1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -0,0 +1,16 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface BatchCondition {
+
+    enum Result {
+        ACCEPT,
+        REJECT,
+        DISCARD
+    }
+
+    Result apply(ParsedStreamMessage message);
+
+    BatchCondition copy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
new file mode 100644
index 0000000..3c1e367
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class LimitedSizeCondition implements BatchCondition {
+
+    private final int limit;
+    private int count;
+
+    public LimitedSizeCondition(int limit) {
+        this.limit = limit;
+        this.count = 0;
+    }
+
+    @Override
+    public Result apply(ParsedStreamMessage message) {
+        if (count < limit) {
+            count++;
+            return Result.ACCEPT;
+        } else {
+            return Result.REJECT;
+        }
+    }
+
+    @Override
+    public BatchCondition copy() {
+        return new LimitedSizeCondition(this.limit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
deleted file mode 100644
index baf7b04..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.kylin.streaming;
-
-/**
- */
-public final class MicroBatchCondition {
-
-    private final int batchSize;
-    private final int batchInterval;
-
-    public MicroBatchCondition(int batchSize, int batchInterval) {
-        this.batchSize = batchSize;
-        this.batchInterval = batchInterval;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public int getBatchInterval() {
-        return batchInterval;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb8dfb1..3da4d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -52,7 +52,6 @@ import java.util.concurrent.*;
 public class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
-    private final long startTimestamp;
 
     private StreamParser streamParser = StringStreamParser.instance;
 
@@ -60,26 +59,49 @@ public class StreamBuilder implements Runnable {
 
     private final MicroStreamBatchConsumer consumer;
 
-    private final MicroBatchCondition condition;
-
     private final String streaming;
 
-    public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private final long startTimestamp;
+
+    private final long batchInterval;
+
+    private final int batchSize;
+
+    public static final StreamBuilder newPeriodicalStreamBuilder(String streaming,
+                                                                 List<BlockingQueue<StreamMessage>> inputs,
+                                                                 MicroStreamBatchConsumer consumer,
+                                                                 long startTimestamp,
+                                                                 long batchInterval) {
+        return new StreamBuilder(streaming, inputs, consumer, startTimestamp, batchInterval);
+    }
+
+    public static final StreamBuilder newLimitedSizeStreamBuilder(String streaming,
+                                                                 BlockingQueue<StreamMessage> input,
+                                                                 MicroStreamBatchConsumer consumer,
+                                                                 long startTimestamp,
+                                                                 int batchSize) {
+        return new StreamBuilder(streaming, input, consumer, startTimestamp, batchSize);
+    }
+
+
+    private StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroStreamBatchConsumer consumer, long startTimestamp, long batchInterval) {
         Preconditions.checkArgument(inputs.size() > 0);
         this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
-        this.condition = condition;
         this.startTimestamp = startTimestamp;
+        this.batchInterval = batchInterval;
+        this.batchSize = -1;
         init(inputs);
     }
 
-    public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+    private StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroStreamBatchConsumer consumer, long startTimestamp, int batchSize) {
         this.streaming = streaming;
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
-        this.condition = condition;
         this.startTimestamp = startTimestamp;
+        this.batchInterval = -1L;
+        this.batchSize = batchSize;
         init(Preconditions.checkNotNull(input));
     }
 
@@ -91,6 +113,14 @@ public class StreamBuilder implements Runnable {
         this.streamMessageQueues.addAll(inputs);
     }
 
+    private BatchCondition generateBatchCondition(long startTimestamp) {
+        if (batchInterval > 0) {
+            return new TimePeriodCondition(startTimestamp, startTimestamp + batchInterval);
+        } else {
+            return new LimitedSizeCondition(batchSize);
+        }
+    }
+
     @Override
     public void run() {
         try {
@@ -106,7 +136,7 @@ public class StreamBuilder implements Runnable {
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
                 int partitionId = 0;
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
-                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+                    futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -125,9 +155,9 @@ public class StreamBuilder implements Runnable {
                         batch = MicroStreamBatch.union(batch, batches.get(i));
                     }
                 }
-                batch.getTimestamp().setFirst(start);
-                batch.getTimestamp().setSecond(start + condition.getBatchInterval());
-                start += condition.getBatchInterval();
+                if (batchInterval > 0) {
+                    start += batchInterval;
+                }
 
                 if (batch.size() == 0) {
                     logger.info("nothing to build, skip to next iteration after sleeping 10s");
@@ -163,15 +193,13 @@ public class StreamBuilder implements Runnable {
         private final BlockingQueue<StreamMessage> streamMessageQueue;
         private final CountDownLatch countDownLatch;
         private final int partitionId;
-        private long startTimestamp;
-        private long endTimestamp;
+        private final BatchCondition condition;
 
-        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+        public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
             this.partitionId = partitionId;
             this.streamMessageQueue = streamMessageQueue;
             this.countDownLatch = countDownLatch;
-            this.startTimestamp = startTimestamp;
-            this.endTimestamp = endTimestamp;
+            this.condition = condition;
         }
 
         private void clearCounter() {
@@ -228,22 +256,18 @@ public class StreamBuilder implements Runnable {
                         throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
                     }
 
-                    final long timestamp = parsedStreamMessage.getTimestamp();
-                    if (timestamp < startTimestamp) {
-                        //TODO properly handle late megs
-                        streamMessageQueue.take();
-                    } else if (timestamp < endTimestamp) {
-                        streamMessageQueue.take();
-                        if (parsedStreamMessage.isAccepted()) {
+                    final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+                    if (parsedStreamMessage.isAccepted()) {
+                        if (result == BatchCondition.Result.ACCEPT) {
+                            streamMessageQueue.take();
                             microStreamBatch.add(parsedStreamMessage);
-                            if (microStreamBatch.size() >= condition.getBatchSize()) {
-                                return microStreamBatch;
-                            }
-                        } else {
-                            //ignore pruned stream message
+                        } else if (result == BatchCondition.Result.DISCARD) {
+                            streamMessageQueue.take();
+                        } else if (result == BatchCondition.Result.REJECT) {
+                            return microStreamBatch;
                         }
                     } else {
-                        return microStreamBatch;
+                        streamMessageQueue.take();
                     }
                 }
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
new file mode 100644
index 0000000..fdd35fc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class TimePeriodCondition implements BatchCondition {
+
+    private final long startTime;
+    private final long endTime;
+
+    public TimePeriodCondition(long startTime, long endTime) {
+        this.startTime = startTime;
+        this.endTime = endTime;
+
+    }
+    @Override
+    public Result apply(ParsedStreamMessage message) {
+        if (message.getTimestamp() < startTime) {
+            return Result.DISCARD;
+        } else if (message.getTimestamp() < endTime) {
+            return Result.ACCEPT;
+        } else {
+            return Result.REJECT;
+        }
+    }
+
+    @Override
+    public BatchCondition copy() {
+        return this;
+    }
+}