You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/10 09:35:27 UTC
incubator-kylin git commit: KYLIN-820 minor change of kylin.sh to
support crontab run OneOffStreamBuilder
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 a52dfb67a -> 3a31d916a
KYLIN-820 minor change of kylin.sh to support crontab run OneOffStreamBuilder
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3a31d916
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3a31d916
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3a31d916
Branch: refs/heads/0.8.0
Commit: 3a31d916abb0ea7a55a37aa5b278cc1472b655e0
Parents: a52dfb6
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 9 20:07:17 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jun 10 15:33:17 2015 +0800
----------------------------------------------------------------------
bin/kylin.sh | 4 +--
.../kylin/job/streaming/BootstrapConfig.java | 28 +++++++++++++++
.../kylin/job/streaming/StreamingBootstrap.java | 37 +++++++++-----------
.../kylin/job/streaming/StreamingCLI.java | 25 ++++++++-----
.../apache/kylin/streaming/BatchCondition.java | 2 +-
.../kylin/streaming/OneOffStreamBuilder.java | 4 +--
.../kylin/streaming/TimePeriodCondition.java | 13 +++++--
7 files changed, 76 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/bin/kylin.sh b/bin/kylin.sh
index 4bbecfc..97c29c5 100644
--- a/bin/kylin.sh
+++ b/bin/kylin.sh
@@ -83,7 +83,7 @@ elif [ $1 == "streaming" ]
then
if [ $# -lt 4 ]
then
- echo 'invalid input args'
+ echo "invalid input args $@"
exit -1
fi
if [ $2 == "start" ]
@@ -113,7 +113,7 @@ then
-Dkylin.hbase.dependency=${hbase_dependency} \
-Dspring.profiles.active=${spring_profile} \
org.apache.kylin.job.streaming.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/$3_$4 &
- echo "streaming started $3 partition $4"
+ echo "streaming started name: $3 id: $4"
exit 0
elif [ $2 == "stop" ]
then
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index 5030b66..a82fec3 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -4,9 +4,21 @@ package org.apache.kylin.job.streaming;
*/
public class BootstrapConfig {
+ private String streaming;
+ private int partitionId = -1;
+
private boolean oneOff = false;
private long start = 0L;
private long end = 0L;
+ private long margin = 0L;
+
+ public long getMargin() {
+ return margin;
+ }
+
+ public void setMargin(long margin) {
+ this.margin = margin;
+ }
public boolean isOneOff() {
return oneOff;
@@ -31,4 +43,20 @@ public class BootstrapConfig {
public void setEnd(long end) {
this.end = end;
}
+
+ public String getStreaming() {
+ return streaming;
+ }
+
+ public void setStreaming(String streaming) {
+ this.streaming = streaming;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 809f454..5bdc670 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -111,19 +111,23 @@ public class StreamingBootstrap {
return result;
}
- public void start(String streaming, int partitionId, BootstrapConfig bootstrapConfig) throws Exception {
+ public void start(BootstrapConfig bootstrapConfig) throws Exception {
+ final String streaming = bootstrapConfig.getStreaming();
+ Preconditions.checkNotNull(streaming, "streaming name cannot be empty");
final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(streaming);
Preconditions.checkArgument(streamingConfig != null, "cannot find kafka config:" + streaming);
if (!StringUtils.isEmpty(streamingConfig.getIiName())) {
+ int partitionId = bootstrapConfig.getPartitionId();
+ Preconditions.checkArgument(partitionId >= 0, "partitionId cannot be empty for inverted index streaming");
startIIStreaming(streamingConfig, partitionId);
} else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
if (bootstrapConfig.isOneOff()) {
Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
- startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+ startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd(), bootstrapConfig.getMargin());
} else {
- startCubeStreaming(streamingConfig, partitionId);
+ startCubeStreaming(streamingConfig);
}
} else {
throw new IllegalArgumentException("no cube or ii in kafka config");
@@ -131,7 +135,10 @@ public class StreamingBootstrap {
}
public void start(String streaming, int partitionId) throws Exception {
- start(streaming, partitionId, new BootstrapConfig());
+ final BootstrapConfig bootstrapConfig = new BootstrapConfig();
+ bootstrapConfig.setPartitionId(partitionId);
+ bootstrapConfig.setStreaming(streaming);
+ start(bootstrapConfig);
}
private List<BlockingQueue<StreamMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, final Map<Integer, Long> partitionIdOffsetMap, final int partitionIdOffset) {
@@ -154,21 +161,13 @@ public class StreamingBootstrap {
}
logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId + " transferredPartitionId:" + transferredPartitionId);
KafkaConsumer consumer = new KafkaConsumer(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, kafkaClusterConfig.getBrokers(), kafkaClusterConfig);
- final String threadName = String.format("%s_%d_%d", kafkaClusterConfig.getTopic(), clusterID, partitionId);
- Executors.newSingleThreadExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- final Thread thread = new Thread(r, threadName);
- thread.setDaemon(true);
- return thread;
- }
- }).submit(consumer);
+ Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
return result;
}
- private void startCubeStreaming(StreamingConfig streamingConfig, final int partitionId) throws Exception {
+ private void startCubeStreaming(StreamingConfig streamingConfig) throws Exception {
List<KafkaClusterConfig> kafkaClusterConfigs = streamingConfig.getKafkaClusterConfigs();
final List<BlockingQueue<StreamMessage>> allClustersData = Lists.newArrayList();
@@ -188,7 +187,6 @@ public class StreamingBootstrap {
partitionIdOffset = 0;
for (KafkaClusterConfig kafkaClusterConfig : kafkaClusterConfigs) {
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- Preconditions.checkArgument(partitionId >= 0 && partitionId < partitionCount, "invalid partition id:" + partitionId);
final List<BlockingQueue<StreamMessage>> oneClusterData = consume(clusterID, kafkaClusterConfig, partitionCount, partitionIdOffsetMap, partitionIdOffset);
logger.info("Cluster {} with {} partitions", allClustersData.size(), oneClusterData.size());
@@ -225,7 +223,7 @@ public class StreamingBootstrap {
}
}
- private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp) throws Exception {
+ private void startOneOffCubeStreaming(StreamingConfig streamingConfig, long startTimestamp, long endTimestamp, long margin) throws Exception {
final String cubeName = streamingConfig.getCubeName();
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
final StreamParser streamParser = getStreamParser(streamingConfig, Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@@ -235,9 +233,6 @@ public class StreamingBootstrap {
return input.getColRef();
}
}));
- final int batchInterval = 5 * 60 * 1000;
- startTimestamp = TimeUtil.getNextPeriodStart(startTimestamp, batchInterval);
- endTimestamp = TimeUtil.getNextPeriodStart(endTimestamp, batchInterval);
final List<BlockingQueue<StreamMessage>> queues = Lists.newLinkedList();
int clusterId = 0;
@@ -248,7 +243,7 @@ public class StreamingBootstrap {
final CountDownLatch countDownLatch = new CountDownLatch(partitionCount);
for (int i = 0; i < partitionCount; ++i) {
final int idx = i;
- final long start = startTimestamp;
+ final long start = startTimestamp - margin;
executorService.submit(new Runnable() {
@Override
public void run() {
@@ -270,7 +265,7 @@ public class StreamingBootstrap {
}
logger.info(String.format("starting one off streaming build with timestamp{%d, %d}", startTimestamp, endTimestamp));
- OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp);
+ OneOffStreamBuilder oneOffStreamBuilder = new OneOffStreamBuilder(streamingConfig.getName(), queues, streamParser, new CubeStreamConsumer(cubeName), startTimestamp, endTimestamp, margin);
Executors.newSingleThreadExecutor().submit(oneOffStreamBuilder).get();
logger.info("one off build finished");
System.exit(0);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index c120015..9b93852 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -55,28 +55,35 @@ public class StreamingCLI {
Preconditions.checkArgument(args[0].equals("streaming"));
Preconditions.checkArgument(args[1].equals("start"));
- String kafkaConfName = args[2];
- int partition = Integer.parseInt(args[3]);
- int i = 4;
+ int i = 2;
BootstrapConfig bootstrapConfig = new BootstrapConfig();
while (i < args.length) {
String argName = args[i];
switch (argName) {
case "-oneoff":
- bootstrapConfig.setOneOff(Boolean.parseBoolean(args[i + 1]));
+ bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
break;
case "-start":
- bootstrapConfig.setStart(Long.parseLong(args[i + 1]));
+ bootstrapConfig.setStart(Long.parseLong(args[++i]));
break;
case "-end":
- bootstrapConfig.setEnd(Long.parseLong(args[i + 1]));
+ bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+ break;
+ case "-streaming":
+ bootstrapConfig.setStreaming(args[++i]);
+ break;
+ case "-partition":
+ bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+ break;
+ case "-margin":
+ bootstrapConfig.setMargin(Long.parseLong(args[++i]));
break;
default:
- throw new RuntimeException("invalid argName:" + argName);
+ logger.warn("ignore this arg:" + argName);
}
- i += 2;
+ i++;
}
- StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, partition, bootstrapConfig);
+ StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(bootstrapConfig);
} catch (Exception e) {
printArgsError(args);
logger.error("error start streaming", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
index 54f966c..2748911 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/BatchCondition.java
@@ -5,7 +5,7 @@ package org.apache.kylin.streaming;
public interface BatchCondition {
enum Result {
- ACCEPT, REJECT, DISCARD,LAST_ACCEPT_FOR_BATCH
+ ACCEPT, REJECT, DISCARD, LAST_ACCEPT_FOR_BATCH
}
Result apply(ParsedStreamMessage message);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
index 2c062cc..f076988 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/OneOffStreamBuilder.java
@@ -22,9 +22,9 @@ public class OneOffStreamBuilder implements Runnable {
private final TimePeriodCondition batchCondition;
private StreamParser streamParser;
- public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime) {
+ public OneOffStreamBuilder(String streaming, List<BlockingQueue<StreamMessage>> queues, StreamParser streamParser, MicroStreamBatchConsumer consumer, long startTime, long endTime, long margin) {
Preconditions.checkArgument(queues.size() > 0);
- this.batchCondition = new TimePeriodCondition(startTime, endTime);
+ this.batchCondition = new TimePeriodCondition(startTime, endTime, margin);
this.streaming = streaming;
this.queues = queues;
this.consumer = Preconditions.checkNotNull(consumer);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3a31d916/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
index d3349d5..560504c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/TimePeriodCondition.java
@@ -6,10 +6,16 @@ public class TimePeriodCondition implements BatchCondition {
private final long startTime;
private final long endTime;
+ private final long margin;
public TimePeriodCondition(long startTime, long endTime) {
+ this(startTime, endTime, 0L);
+ }
+
+ public TimePeriodCondition(long startTime, long endTime, long margin) {
this.startTime = startTime;
this.endTime = endTime;
+ this.margin = margin;
}
public long getStartTime() {
@@ -22,10 +28,13 @@ public class TimePeriodCondition implements BatchCondition {
@Override
public Result apply(ParsedStreamMessage message) {
- if (message.getTimestamp() < startTime) {
+ final long timestamp = message.getTimestamp();
+ if (timestamp < startTime) {
return Result.DISCARD;
- } else if (message.getTimestamp() < endTime) {
+ } else if (timestamp < endTime) {
return Result.ACCEPT;
+ } else if (timestamp < endTime + margin) {
+ return Result.DISCARD;
} else {
return Result.REJECT;
}