You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/08 13:32:30 UTC
incubator-kylin git commit: KYLIN-820
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 bdde514c3 -> 8b60283a4
KYLIN-820
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8b60283a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8b60283a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8b60283a
Branch: refs/heads/0.8.0
Commit: 8b60283a47f5494e4422704c8c6ca3a878eaca1f
Parents: bdde514
Author: qianhao.zhou <qi...@ebay.com>
Authored: Mon Jun 8 18:24:01 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon Jun 8 19:33:13 2015 +0800
----------------------------------------------------------------------
bin/kylin.sh | 4 +-
.../kylin/job/streaming/BootstrapConfig.java | 34 ++++++
.../kylin/job/streaming/CubeStreamConsumer.java | 2 +-
.../kylin/job/streaming/StreamingBootstrap.java | 77 ++++++++++----
.../kylin/job/streaming/StreamingCLI.java | 36 +++++--
.../kylin/streaming/OneOffStreamBuilder.java | 83 +++++++++++++++
.../apache/kylin/streaming/StreamBuilder.java | 101 ++----------------
.../apache/kylin/streaming/StreamFetcher.java | 104 +++++++++++++++++++
.../apache/kylin/streaming/StreamingUtil.java | 82 +++++++++++++++
.../kylin/streaming/TimePeriodCondition.java | 8 ++
10 files changed, 401 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index a179fc8..4bbecfc 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -81,7 +81,7 @@ then
exit 0
elif [ $1 == "streaming" ]
then
- if [ $# != 4 ]
+ if [ $# -lt 4 ]
then
echo 'invalid input args'
exit -1
@@ -112,7 +112,7 @@ then
-Dkylin.hive.dependency=${hive_dependency} \
-Dkylin.hbase.dependency=${hbase_dependency} \
-Dspring.profiles.active=${spring_profile} \
- org.apache.kylin.job.streaming.StreamingCLI start $3 $4 > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
+ org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
echo "streaming started $3 partition $4"
exit 0
elif [ $2 == "stop" ]
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
new file mode 100644
index 0000000..5030b66
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.job.streaming;
+
+/**
+ */
+public class BootstrapConfig {
+
+ private boolean oneOff = false;
+ private long start = 0L;
+ private long end = 0L;
+
+ public boolean isOneOff() {
+ return oneOff;
+ }
+
+ public void setOneOff(boolean oneOff) {
+ this.oneOff = oneOff;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 03dd92a..98435cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -95,7 +95,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getSecond(), false, false);
+ final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), microStreamBatch.getTimestamp().getFirst(), microStreamBatch.getTimestamp().getSecond(), false, false);
long start = System.currentTimeMillis();
final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
logger.info(String.format("sampling of %d messages cost %d ms", parsedStreamMessages.size(), (System.currentTimeMillis() - start)));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index bcc1b97..e6ce350 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -40,7 +40,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
-import kafka.javaapi.PartitionMetadata;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.HBaseConnection;
@@ -63,6 +62,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -97,15 +97,6 @@ public class StreamingBootstrap {
return bootstrap;
}
- private static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
- final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
- return partitionMetadata.leader();
- } else {
- return null;
- }
- }
-
public void stop() {
for (KafkaConsumer consumer : kafkaConsumers.values()) {
consumer.stop();
@@ -123,24 +114,34 @@ public class StreamingBootstrap {
return result;
}
- public void start(String streaming, int partitionId) throws Exception {
+ public void start(String streaming, int partitionId, BootstrapConfig bootstrapConfig) throws Exception {
final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
startIIStreaming(streamingConfig, partitionId);
} else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
- startCubeStreaming(streamingConfig, partitionId);
+ if (bootstrapConfig.isOneOff()) {
+ Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
+ Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
+ startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+ } else {
+ startCubeStreaming(streamingConfig, partitionId);
+ }
} else {
throw new IllegalArgumentException("no cube or ii in kafka config");
}
}
- private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionOffset) {
+ public void start(String streaming, int partitionId) throws Exception {
+ start(streaming, partitionId, new BootstrapConfig());
+ }
+
+ private List<BlockingQueue<StreamMessage>> consume(int clusterID, KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionIdOffset) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
- final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
- final int transferredPartitionId = partitionId + partitionOffset;
+ final Broker leadBroker = StreamingUtil.getLeadBroker(kafkaClusterConfig, partitionId);
+ final int transferredPartitionId = partitionId + partitionIdOffset;
final long latestOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
long streamingOffset = latestOffset;
if (partitionIdOffsetMap.containsKey(transferredPartitionId)) {
@@ -168,27 +169,27 @@ public class StreamingBootstrap {
final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
ArrayList<Integer> allPartitions = Lists.newArrayList();
- int partitionOffset = 0;
+ int partitionIdOffset = 0;
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
for (int i = 0; i < partitionCount; i++) {
- allPartitions.add(i + partitionOffset);
+ allPartitions.add(i + partitionIdOffset);
}
- partitionOffset += partitionCount;
+ partitionIdOffset += partitionCount;
}
final Map<Integer, Long> partitionIdOffsetMap = streamingManager.getOffset(streamingConfig.getName(), allPartitions);
int clusterID = 0;
- partitionOffset = 0;
+ partitionIdOffset = 0;
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
- final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionOffset);
+ final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionIdOffset);
logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
allClustersData.addAll(oneClusterData);
clusterID++;
- partitionOffset += partitionCount;
+ partitionIdOffset += partitionCount;
}
final String cubeName = streamingConfig.getCubeName();
@@ -219,6 +220,38 @@ public class StreamingBootstrap {
}
}
+ private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp) throws Exception {
+ final String cubeName = streamingConfig.getCubeName();
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final StreamParser streamParser = getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(IntermediateColumnDesc input) {
+ return input.getColRef();
+ }
+ }));
+ final int batchInterval = 5 * 60 * 1000;
+ startTimestamp = TimeUtil.getNextPeriodStart(startTimestamp, batchInterval);
+ endTimestamp = TimeUtil.getNextPeriodStart(endTimestamp, batchInterval);
+ final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
+
+ int clusterId = 0;
+ for (KafkaClusterConfig kafkaClusterConfig : streamingConfig.getKafkaClusterConfigs()) {
+ HashMap<Integer, Long> partitionIdOffsetMap = Maps.newHashMap();
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ for (int i = 0; i < partitionCount; ++i) {
+ partitionIdOffsetMap.put(i, StreamingUtil.findClosestOffsetWithDataTimestamp(kafkaClusterConfig, i, startTimestamp, streamParser));
+ }
+ final List<BlockingQueue<StreamMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, 0);
+ queues.addAll(oneClusterQueue);
+ logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+ }
+
+ logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
+ OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp);
+ Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
+ }
+
private void startIIStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
List<KafkaClusterConfig> allClustersConfigs = streamingConfig.getKafkaClusterConfigs();
@@ -236,7 +269,7 @@ public class StreamingBootstrap {
Preconditions.checkArgument(ii.getSegments().size() > 0);
final IISegment iiSegment = ii.getSegments().get(0);
- final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
+ final Broker leadBroker = StreamingUtil.getLeadBroker(kafkaClusterConfig, partitionId);
Preconditions.checkState(leadBroker != null, "cannot find lead broker");
final int shard = ii.getDescriptor().getSharding();
Preconditions.checkArgument(shard % partitionCount == 0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index ea2b2a5..c120015 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -34,6 +34,7 @@
package org.apache.kylin.job.streaming;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.cache.RemoteCacheUpdater;
@@ -49,20 +50,33 @@ public class StreamingCLI {
public static void main(String[] args) {
try {
- if (args.length != 3) {
- printArgsError(args);
- return;
- }
-
AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
- if (args[0].equals("start")) {
- String kafkaConfName = args[1];
- int partition = Integer.parseInt(args[2]);
- StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition);
- } else {
- printArgsError(args);
+ Preconditions.checkArgument(args[0].equals("streaming"));
+ Preconditions.checkArgument(args[1].equals("start"));
+
+ String kafkaConfName = args[2];
+ int partition = Integer.parseInt(args[3]);
+ int i = 4;
+ BootstrapConfig bootstrapConfig = new BootstrapConfig();
+ while (i < args.length) {
+ String argName = args[i];
+ switch (argName) {
+ case "-oneoff":
+ bootstrapConfig.setOneOff(Boolean.parseBoolean(args[i + 1]));
+ break;
+ case "-start":
+ bootstrapConfig.setStart(Long.parseLong(args[i + 1]));
+ break;
+ case "-end":
+ bootstrapConfig.setEnd(Long.parseLong(args[i + 1]));
+ break;
+ default:
+ throw new RuntimeException("invalid argName:" + argName);
+ }
+ i += 2;
}
+ StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition, bootstrapConfig);
} catch (Exception e) {
printArgsError(args);
logger.error("error start streaming", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
new file mode 100644
index 0000000..2c062cc
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.DateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ */
+public class OneOffStreamBuilder implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(OneOffStreamBuilder.class);
+
+
+ private final String streaming;
+ private final List<BlockingQueue<StreamMessage>> queues;
+ private final MicroStreamBatchConsumer consumer;
+ private final TimePeriodCondition batchCondition;
+ private StreamParser streamParser;
+
+ public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime) {
+ Preconditions.checkArgument(queues.size() > 0);
+ this.batchCondition = new TimePeriodCondition(startTime, endTime);
+ this.streaming = streaming;
+ this.queues = queues;
+ this.consumer = Preconditions.checkNotNull(consumer);
+ this.streamParser = streamParser;
+ }
+
+ @Override
+ public void run() {
+ try {
+ final int inputCount = queues.size();
+ final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
+ final CountDownLatch countDownLatch = new CountDownLatch(inputCount);
+ final List<Future<MicroStreamBatch>> futures = Lists.newLinkedList();
+ int partitionId = 0;
+ for (BlockingQueue<StreamMessage> queue : queues) {
+ futures.add(executorService.submit(new StreamFetcher(partitionId, queue, countDownLatch, batchCondition, streamParser)));
+ }
+ countDownLatch.await();
+ List<MicroStreamBatch> batches = Lists.newLinkedList();
+ for (Future<MicroStreamBatch> future : futures) {
+ if (future.get() != null) {
+ batches.add(future.get());
+ } else {
+ logger.warn("EOF encountered, stop streaming");
+ }
+ }
+
+ MicroStreamBatch batch = batches.get(0);
+ if (batches.size() > 1) {
+ for (int i = 1; i < inputCount; i++) {
+ if (batches.get(i).size() > 0) {
+ batch = MicroStreamBatch.union(batch, batches.get(i));
+ }
+ }
+ }
+ batch.getTimestamp().setFirst(batchCondition.getStartTime());
+ batch.getTimestamp().setSecond(batchCondition.getEndTime());
+
+ logger.info("Consuming {} messages, covering from {} to {}", new String[] { String.valueOf(batch.size()), DateFormat.formatToTimeStr(batch.getTimestamp().getFirst()), DateFormat.formatToTimeStr(batch.getTimestamp().getSecond()) });
+ long startTime = System.currentTimeMillis();
+ consumer.consume(batch);
+ logger.info("Batch build costs {} milliseconds", System.currentTimeMillis() - startTime);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("this thread should not be interrupted", ie);
+ } catch (ExecutionException ee) {
+ logger.error("fetch stream error", ee);
+ throw new RuntimeException(ee);
+ } catch (Exception e) {
+ logger.error("error consume batch", e);
+ throw new RuntimeException("error consume batch", e);
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 76e58cf..d97d86c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -136,7 +136,7 @@ public class StreamBuilder implements Runnable {
ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
int partitionId = 0;
for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues) {
- futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start))));
+ futures.add(executorService.submit(new StreamFetcher(partitionId++, streamMessageQueue, countDownLatch, generateBatchCondition(start), getStreamParser())));
}
countDownLatch.await();
ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -152,10 +152,15 @@ public class StreamBuilder implements Runnable {
MicroStreamBatch batch = batches.get(0);
if (batches.size() > 1) {
for (int i = 1; i < inputCount; i++) {
- if (batches.get(i).size() > 0)
+ if (batches.get(i).size() > 0) {
batch = MicroStreamBatch.union(batch, batches.get(i));
+ }
}
}
+ if (batches.size() > 1) {
+ batch.getTimestamp().setFirst(start);
+ batch.getTimestamp().setSecond(start + batchInterval);
+ }
if (batchInterval > 0) {
start += batchInterval;
}
@@ -189,98 +194,6 @@ public class StreamBuilder implements Runnable {
}
}
- private class StreamFetcher implements Callable<MicroStreamBatch> {
-
- private final BlockingQueue<StreamMessage> streamMessageQueue;
- private final CountDownLatch countDownLatch;
- private final int partitionId;
- private final BatchCondition condition;
-
- public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition) {
- this.partitionId = partitionId;
- this.streamMessageQueue = streamMessageQueue;
- this.countDownLatch = countDownLatch;
- this.condition = condition;
- }
-
- private void clearCounter() {
- }
-
- private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
- long t = System.currentTimeMillis();
- while (true) {
- final StreamMessage peek = queue.peek();
- if (peek != null) {
- return peek;
- } else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.warn("stream queue should not be interrupted", e);
- return null;
- }
- if (System.currentTimeMillis() - t > timeout) {
- break;
- }
- }
- }
- return queue.peek();
- }
-
- @Override
- public MicroStreamBatch call() throws Exception {
- try {
- MicroStreamBatch microStreamBatch = null;
- while (true) {
- if (microStreamBatch == null) {
- microStreamBatch = new MicroStreamBatch(partitionId);
- clearCounter();
- }
- StreamMessage streamMessage = peek(streamMessageQueue, 30000);
- if (streamMessage == null) {
- logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
- if (!microStreamBatch.isEmpty()) {
- return microStreamBatch;
- } else {
- continue;
- }
- }
- if (streamMessage.getOffset() < 0) {
- consumer.stop();
- logger.warn("streaming encountered EOF, stop building");
- return null;
- }
-
- microStreamBatch.incRawMessageCount();
- final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
- if (parsedStreamMessage == null) {
- throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
- }
-
- final BatchCondition.Result result = condition.apply(parsedStreamMessage);
- if (parsedStreamMessage.isAccepted()) {
- if (result == BatchCondition.Result.ACCEPT) {
- streamMessageQueue.take();
- microStreamBatch.add(parsedStreamMessage);
- } else if (result == BatchCondition.Result.DISCARD) {
- streamMessageQueue.take();
- } else if (result == BatchCondition.Result.REJECT) {
- return microStreamBatch;
- }
- } else {
- streamMessageQueue.take();
- }
- }
- } catch (Exception e) {
- logger.error("build stream error, stop building", e);
- throw new RuntimeException("build stream error, stop building", e);
- } finally {
- logger.info("one partition sign off");
- countDownLatch.countDown();
- }
- }
- }
-
public final StreamParser getStreamParser() {
return streamParser;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
new file mode 100644
index 0000000..4f29610
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFetcher.java
@@ -0,0 +1,104 @@
+package org.apache.kylin.streaming;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ */
+public class StreamFetcher implements Callable<MicroStreamBatch> {
+ private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+
+ private final BlockingQueue<StreamMessage> streamMessageQueue;
+ private final CountDownLatch countDownLatch;
+ private final int partitionId;
+ private final BatchCondition condition;
+ private final StreamParser streamParser;
+
+ public StreamFetcher(int partitionId, BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch countDownLatch, BatchCondition condition, StreamParser streamParser) {
+ this.partitionId = partitionId;
+ this.streamMessageQueue = streamMessageQueue;
+ this.countDownLatch = countDownLatch;
+ this.condition = condition;
+ this.streamParser = streamParser;
+ }
+
+ private void clearCounter() {
+ }
+
+ private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout) {
+ long t = System.currentTimeMillis();
+ while (true) {
+ final StreamMessage peek = queue.peek();
+ if (peek != null) {
+ return peek;
+ } else {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.warn("stream queue should not be interrupted", e);
+ return null;
+ }
+ if (System.currentTimeMillis() - t > timeout) {
+ break;
+ }
+ }
+ }
+ return queue.peek();
+ }
+
+ @Override
+ public MicroStreamBatch call() throws Exception {
+ try {
+ MicroStreamBatch microStreamBatch = null;
+ while (true) {
+ if (microStreamBatch == null) {
+ microStreamBatch = new MicroStreamBatch(partitionId);
+ clearCounter();
+ }
+ StreamMessage streamMessage = peek(streamMessageQueue, 30000);
+ if (streamMessage == null) {
+ logger.info("The stream queue is drained, current available stream count: " + microStreamBatch.size());
+ if (!microStreamBatch.isEmpty()) {
+ return microStreamBatch;
+ } else {
+ continue;
+ }
+ }
+ if (streamMessage.getOffset() < 0) {
+ logger.warn("streaming encountered EOF, stop building");
+ return null;
+ }
+
+ microStreamBatch.incRawMessageCount();
+ final ParsedStreamMessage parsedStreamMessage = streamParser.parse(streamMessage);
+ if (parsedStreamMessage == null) {
+ throw new RuntimeException("parsedStreamMessage of " + new String(streamMessage.getRawData()) + " is null");
+ }
+
+ final BatchCondition.Result result = condition.apply(parsedStreamMessage);
+ if (parsedStreamMessage.isAccepted()) {
+ if (result == BatchCondition.Result.ACCEPT) {
+ streamMessageQueue.take();
+ microStreamBatch.add(parsedStreamMessage);
+ } else if (result == BatchCondition.Result.DISCARD) {
+ streamMessageQueue.take();
+ } else if (result == BatchCondition.Result.REJECT) {
+ return microStreamBatch;
+ }
+ } else {
+ streamMessageQueue.take();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("build stream error, stop building", e);
+ throw new RuntimeException("build stream error, stop building", e);
+ } finally {
+ logger.info("one partition sign off");
+ countDownLatch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
new file mode 100644
index 0000000..2a693a6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -0,0 +1,82 @@
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public final class StreamingUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamingUtil.class);
+
+ private StreamingUtil(){}
+
+ public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) {
+ final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
+ if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+ return partitionMetadata.leader();
+ } else {
+ return null;
+ }
+ }
+
+ public static long getDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long offset, StreamParser streamParser) {
+ final String topic = kafkaClusterConfig.getTopic();
+ final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+ final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaClusterConfig);
+ Preconditions.checkArgument(response.errorCode(topic, partitionId) == 0, "errorCode of FetchResponse is:" + response.errorCode(topic, partitionId));
+ final MessageAndOffset messageAndOffset = response.messageSet(topic, partitionId).iterator().next();
+ final ByteBuffer payload = messageAndOffset.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ final ParsedStreamMessage parsedStreamMessage = streamParser.parse(new StreamMessage(messageAndOffset.offset(), bytes));
+ return parsedStreamMessage.getTimestamp();
+ }
+
+ public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
+ final String topic = kafkaClusterConfig.getTopic();
+ final Broker leadBroker = Preconditions.checkNotNull(getLeadBroker(kafkaClusterConfig, partitionId), "unable to find leadBroker with config:" + kafkaClusterConfig + " partitionId:" + partitionId);
+ final long earliestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaClusterConfig);
+ final long latestOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaClusterConfig);
+ logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, earliestOffset, latestOffset));
+ return binarySearch(kafkaClusterConfig, partitionId, earliestOffset, latestOffset, timestamp, streamParser);
+ }
+
+ private static long binarySearch(KafkaClusterConfig kafkaClusterConfig, int partitionId, long startOffset, long endOffset, long targetTimestamp, StreamParser streamParser) {
+ while (startOffset <= endOffset + 2) {
+ long midOffset = startOffset + ((endOffset - startOffset) >> 1);
+ long startTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, startOffset, streamParser);
+ long endTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, endOffset, streamParser);
+ long midTimestamp = getDataTimestamp(kafkaClusterConfig, partitionId, midOffset, streamParser);
+ // hard to ensure these 2 conditions
+// Preconditions.checkArgument(startTimestamp <= midTimestamp);
+// Preconditions.checkArgument(midTimestamp <= endTimestamp);
+ if (startTimestamp > targetTimestamp) {
+ return startOffset;
+ }
+ if (endTimestamp < targetTimestamp) {
+ return endOffset;
+ }
+ if (targetTimestamp == midTimestamp) {
+ return midOffset;
+ } else if (targetTimestamp < midTimestamp) {
+ endOffset = midOffset;
+ continue;
+ } else {
+ startOffset = midOffset;
+ continue;
+ }
+ }
+ return startOffset;
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8b60283a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
index 7752437..d3349d5 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -10,8 +10,16 @@ public class TimePeriodCondition implements BatchCondition {
public TimePeriodCondition(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+ public long getEndTime() {
+ return endTime;
}
+
@Override
public Result apply(ParsedStreamMessage message) {
if (message.getTimestamp() < startTime) {