You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/05 06:35:51 UTC
[3/3] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4f7b1bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4f7b1bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4f7b1bff
Branch: refs/heads/0.8.0
Commit: 4f7b1bffa31773e434258db7532d6eb0944b94fe
Parents: 11e9d3e
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 5 12:32:15 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jun 5 12:32:15 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamConsumer.java | 2 +
.../kylin/job/streaming/StreamingBootstrap.java | 10 +--
.../apache/kylin/job/BuildIIWithStreamTest.java | 7 +-
.../job/streaming/CubeStreamConsumerTest.java | 6 +-
.../apache/kylin/streaming/BatchCondition.java | 16 ++++
.../kylin/streaming/LimitedSizeCondition.java | 29 +++++++
.../kylin/streaming/MicroBatchCondition.java | 22 ------
.../apache/kylin/streaming/StreamBuilder.java | 82 +++++++++++++-------
.../kylin/streaming/TimePeriodCondition.java | 30 +++++++
9 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index bd7c6cb..03dd92a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -96,7 +96,9 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+ long start = System.currentTimeMillis();
final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
+ logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
final Configuration conf = HadoopUtil.getCurrentConfiguration();
final Path outputPath = new Path("file:///tmp/cuboidstatistics/" + UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 436c55b..e28fa9d 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -190,10 +190,9 @@ public class StreamingBootstrap {
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
int batchInterval = 5 * 60 * 1000;
- MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
logger.info("batch time interval is {} to {}", DateFormat.formatToTimeStr(startTimestamp), DateFormat.formatToTimeStr(startTimestamp + batchInterval));
- StreamBuilder cubeStreamBuilder = new StreamBuilder(streamingConfig.getName(), allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
+ StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(streamingConfig.getName(), allClustersData, new CubeStreamConsumer(cubeName), startTimestamp, batchInterval);
cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@Nullable
@Override
@@ -261,10 +260,11 @@ public class StreamingBootstrap {
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
- final StreamBuilder task = new StreamBuilder(streamingConfig.getName(),
+ final StreamBuilder task = StreamBuilder.newLimitedSizeStreamBuilder(streamingConfig.getName(),
consumer.getStreamQueue(i % parallelism),
- new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
- new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i), 0L);
+ new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(), iiDesc, i),
+ 0L,
+ iiDesc.getSliceSize());
task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
if (i == endShard - 1) {
streamingBuilderPool.submit(task).get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 6d660e5..9693771 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -58,7 +58,6 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.streaming.MicroBatchCondition;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -214,11 +213,11 @@ public class BuildIIWithStreamTest {
ToolRunner.run(new IICreateHTableJob(), args);
ExecutorService executorService = Executors.newSingleThreadExecutor();
- final StreamBuilder streamBuilder = new StreamBuilder(iiName,
+ final StreamBuilder streamBuilder = StreamBuilder.newLimitedSizeStreamBuilder(iiName,
queue,
- new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(), 0),
- 0);
+ 0,
+ segment.getIIDesc().getSliceSize());
List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
int count = sorted.size();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index ca0d037..8e3eb61 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -1,5 +1,6 @@
package org.apache.kylin.job.streaming;
+import com.google.common.collect.Lists;
import org.apache.hadoop.util.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -10,7 +11,6 @@ import org.apache.kylin.cube.CubeBuilder;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.DeployUtil;
-import org.apache.kylin.streaming.MicroBatchCondition;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.junit.Before;
@@ -64,7 +64,9 @@ public class CubeStreamConsumerTest {
@Test
public void test() throws Exception {
LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
- StreamBuilder cubeStreamBuilder = new StreamBuilder(CUBE_NAME, queue, new MicroBatchCondition(Integer.MAX_VALUE, 30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
+ List<BlockingQueue<StreamMessage>> queues = Lists.newArrayList();
+ queues.add(queue);
+ StreamBuilder cubeStreamBuilder = StreamBuilder.newPeriodicalStreamBuilder(CUBE_NAME, queues, new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis(), 30L * 1000);
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
loadDataFromLocalFile(queue, 100000);
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
new file mode 100644
index 0000000..0fa11c1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -0,0 +1,16 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface BatchCondition {
+
+ enum Result {
+ ACCEPT,
+ REJECT,
+ DISCARD
+ }
+
+ Result apply(ParsedStreamMessage message);
+
+ BatchCondition copy();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
new file mode 100644
index 0000000..3c1e367
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/LimitedSizeCondition.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class LimitedSizeCondition implements BatchCondition {
+
+ private final int limit;
+ private int count;
+
+ public LimitedSizeCondition(int limit) {
+ this.limit = limit;
+ this.count = 0;
+ }
+
+ @Override
+ public Result apply(ParsedStreamMessage message) {
+ if (count < limit) {
+ count++;
+ return Result.ACCEPT;
+ } else {
+ return Result.REJECT;
+ }
+ }
+
+ @Override
+ public BatchCondition copy() {
+ return new LimitedSizeCondition(this.limit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
deleted file mode 100644
index baf7b04..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroBatchCondition.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.kylin.streaming;
-
-/**
- */
-public final class MicroBatchCondition {
-
- private final int batchSize;
- private final int batchInterval;
-
- public MicroBatchCondition(int batchSize, int batchInterval) {
- this.batchSize = batchSize;
- this.batchInterval = batchInterval;
- }
-
- public int getBatchSize() {
- return batchSize;
- }
-
- public int getBatchInterval() {
- return batchInterval;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index cb8dfb1..3da4d79 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -52,7 +52,6 @@ import java.util.concurrent.*;
public class StreamBuilder implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
- private final long startTimestamp;
private StreamParser streamParser = StringStreamParser.instance;
@@ -60,26 +59,49 @@ public class StreamBuilder implements Runnable {
private final MicroStreamBatchConsumer consumer;
- private final MicroBatchCondition condition;
-
private final String streaming;
- public StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private final long startTimestamp;
+
+ private final long batchInterval;
+
+ private final int batchSize;
+
+ public static final StreamBuilder newPeriodicalStreamBuilder(String streaming,
+ List<BlockingQueue<StreamMessage>> inputs,
+ MicroStreamBatchConsumer consumer,
+ long startTimestamp,
+ long batchInterval) {
+ return new StreamBuilder(streaming, inputs, consumer, startTimestamp, batchInterval);
+ }
+
+ public static final StreamBuilder newLimitedSizeStreamBuilder(String streaming,
+ BlockingQueue<StreamMessage> input,
+ MicroStreamBatchConsumer consumer,
+ long startTimestamp,
+ int batchSize) {
+ return new StreamBuilder(streaming, input, consumer, startTimestamp, batchSize);
+ }
+
+
+ private StreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> inputs, MicroStreamBatchConsumer consumer, long startTimestamp, long batchInterval) {
Preconditions.checkArgument(inputs.size() > 0);
this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
- this.condition = condition;
this.startTimestamp = startTimestamp;
+ this.batchInterval = batchInterval;
+ this.batchSize = -1;
init(inputs);
}
- public StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroBatchCondition condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
+ private StreamBuilder(String streaming, BlockingQueue<StreamMessage> input, MicroStreamBatchConsumer consumer, long startTimestamp, int batchSize) {
this.streaming = streaming;
this.streamMessageQueues = Lists.newArrayList();
this.consumer = Preconditions.checkNotNull(consumer);
- this.condition = condition;
this.startTimestamp = startTimestamp;
+ this.batchInterval = -1L;
+ this.batchSize = batchSize;
init(Preconditions.checkNotNull(input));
}
@@ -91,6 +113,14 @@ public class StreamBuilder implements Runnable {
this.streamMessageQueues.addAll(inputs);
}
+ private BatchCondition generateBatchCondition(long startTimestamp) {
+ if (batchInterval > 0) {
+ return new TimePeriodCondition(startTimestamp, startTimestamp + batchInterval);
+ } else {
+ return new LimitedSizeCondition(batchSize);
+ }
+ }
+
@Override
public void run() {
try {
@@ -106,7 +136,7 @@ public class StreamBuilder implements Runnable {
ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
int partitionId = 0;
for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
- futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, start, start + condition.getBatchInterval())));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
}
countDownLatch.await();
ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -125,9 +155,9 @@ public class StreamBuilder implements Runnable {
batch = MicroStreamBatch.union(batch, batches.get(i));
}
}
- batch.getTimestamp().setFirst(start);
- batch.getTimestamp().setSecond(start + condition.getBatchInterval());
- start += condition.getBatchInterval();
+ if (batchInterval > 0) {
+ start += batchInterval;
+ }
if (batch.size() == 0) {
logger.info("nothing to build, skip to next iteration after sleeping 10s");
@@ -163,15 +193,13 @@ public class StreamBuilder implements Runnable {
private final BlockingQueue<StreamMessage> streamMessageQueue;
private final CountDownLatch countDownLatch;
private final int partitionId;
- private long startTimestamp;
- private long endTimestamp;
+ private final BatchCondition condition;
- public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, long startTimestamp, long endTimestamp) {
+ public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
this.partitionId = partitionId;
this.streamMessageQueue = streamMessageQueue;
this.countDownLatch = countDownLatch;
- this.startTimestamp = startTimestamp;
- this.endTimestamp = endTimestamp;
+ this.condition = condition;
}
private void clearCounter() {
@@ -228,22 +256,18 @@ public class StreamBuilder implements Runnable {
throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
}
- final long timestamp = parsedStreamMessage.getTimestamp();
- if (timestamp < startTimestamp) {
- //TODO properly handle late megs
- streamMessageQueue.take();
- } else if (timestamp < endTimestamp) {
- streamMessageQueue.take();
- if (parsedStreamMessage.isAccepted()) {
+ final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+ if (parsedStreamMessage.isAccepted()) {
+ if (result == BatchCondition.Result.ACCEPT) {
+ streamMessageQueue.take();
microStreamBatch.add(parsedStreamMessage);
- if (microStreamBatch.size() >= condition.getBatchSize()) {
- return microStreamBatch;
- }
- } else {
- //ignore pruned stream message
+ } else if (result == BatchCondition.Result.DISCARD) {
+ streamMessageQueue.take();
+ } else if (result == BatchCondition.Result.REJECT) {
+ return microStreamBatch;
}
} else {
- return microStreamBatch;
+ streamMessageQueue.take();
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4f7b1bff/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
new file mode 100644
index 0000000..fdd35fc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -0,0 +1,30 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public class TimePeriodCondition implements BatchCondition {
+
+ private final long startTime;
+ private final long endTime;
+
+ public TimePeriodCondition(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+
+ }
+ @Override
+ public Result apply(ParsedStreamMessage message) {
+ if (message.getTimestamp() < startTime) {
+ return Result.DISCARD;
+ } else if (message.getTimestamp() < endTime) {
+ return Result.ACCEPT;
+ } else {
+ return Result.REJECT;
+ }
+ }
+
+ @Override
+ public BatchCondition copy() {
+ return this;
+ }
+}