You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:58 UTC
[21/50] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b6b3388c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b6b3388c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b6b3388c
Branch: refs/heads/streaming-localdict
Commit: b6b3388ce2239fe36f60f8aad2349081813b10f7
Parents: 7088724
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 11:57:27 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 11:57:27 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 35 +++++++++++++++-----
.../kylin/job/streaming/StreamingCLI.java | 2 +-
.../apache/kylin/job/BuildIIWithStreamTest.java | 26 +++++----------
.../apache/kylin/streaming/KafkaConsumer.java | 8 ++++-
.../java/org/apache/kylin/streaming/Stream.java | 2 ++
5 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ddaae29..65b23c4 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
package org.apache.kylin.job.streaming;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.PartitionMetadata;
@@ -50,8 +51,8 @@ import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.Map;
+import java.util.concurrent.*;
/**
* Created by qianzhou on 3/26/15.
@@ -62,6 +63,8 @@ public class StreamingBootstrap {
private StreamManager streamManager;
private IIManager iiManager;
+ private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
+
public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
return new StreamingBootstrap(kylinConfig);
}
@@ -81,9 +84,17 @@ public class StreamingBootstrap {
}
}
- public void startStreaming(String streamingConf, int partitionId) throws Exception {
- final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
- Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+ public void stop(String streaming, int partitionId) throws Exception {
+ final KafkaConsumer consumer = kafkaConsumers.remove(getKey(streaming, partitionId));
+ if (consumer != null) {
+ consumer.stop();
+ consumer.getStreamQueue().put(Stream.EOF);
+ }
+ }
+
+ public void start(String streaming, int partitionId) throws Exception {
+ final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streaming);
+ Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
Preconditions.checkNotNull(ii);
Preconditions.checkArgument(ii.getSegments().size() > 0);
@@ -96,7 +107,8 @@ public class StreamingBootstrap {
if (streamOffset < earliestOffset) {
streamOffset = earliestOffset;
}
-
+ String[] args = new String[]{"-iiname", kafkaConfig.getIiName(), "-htablename", iiSegment.getStorageLocationIdentifier()};
+ ToolRunner.run(new IICreateHTableJob(), args);
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
@Override
@@ -107,11 +119,16 @@ public class StreamingBootstrap {
}
};
final IIDesc desc = ii.getDescriptor();
+ kafkaConsumers.put(getKey(streaming, partitionId), consumer);
- Executors.newSingleThreadExecutor().submit(consumer);
final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), iiSegment.getStorageLocationIdentifier(), desc, partitionId);
task.setStreamParser(JsonStreamParser.instance);
- final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
- future.get();
+
+ Executors.newSingleThreadExecutor().submit(consumer);
+ Executors.newSingleThreadExecutor().submit(task);
+ }
+
+ private String getKey(String streaming, int partitionId) {
+ return streaming + "_" + partitionId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 8813cb3..4977339 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -54,7 +54,7 @@ public class StreamingCLI {
}
if (args[0].equals("start")) {
String kafkaConfName = args[1];
- StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
+ StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, 0);
} else if (args.equals("stop")) {
} else {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 04a53f7..dae2d03 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,7 +34,6 @@
package org.apache.kylin.job;
-import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
@@ -59,14 +58,19 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.streaming.Stream;
import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -111,7 +115,7 @@ public class BuildIIWithStreamTest {
}
@AfterClass
- public static void after() throws Exception {
+ public static void afterClass() throws Exception {
backup();
}
@@ -213,22 +217,10 @@ public class BuildIIWithStreamTest {
ExecutorService executorService = Executors.newSingleThreadExecutor();
final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, segment.getStorageLocationIdentifier(), desc, 0);
int count = 0;
- List<String[]> rawData = Lists.newArrayList();
while (reader.next()) {
- desc.getTimestampColumn();
- rawData.add(reader.getRow());
+ queue.put(parse(reader.getRow()));
count++;
}
- final int timestampColumn = desc.getTimestampColumn();
- Collections.sort(rawData, new Comparator<String[]>() {
- @Override
- public int compare(String[] o1, String[] o2) {
- return o1[timestampColumn].compareTo(o2[timestampColumn]);
- }
- });
- for (String[] row : rawData) {
- queue.put(parse(row));
- }
logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
queue.put(new Stream(-1, null));
final Future<?> future = executorService.submit(streamBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 18c8403..b083dea 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -64,6 +64,8 @@ public abstract class KafkaConsumer implements Runnable {
private Logger logger;
+ private volatile boolean stop = false;
+
public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
this.topic = topic;
this.partitionId = partitionId;
@@ -92,7 +94,7 @@ public abstract class KafkaConsumer implements Runnable {
public void run() {
try {
Broker leadBroker = getLeadBroker();
- while (true) {
+ while (!stop) {
if (leadBroker == null) {
leadBroker = getLeadBroker();
}
@@ -123,4 +125,8 @@ public abstract class KafkaConsumer implements Runnable {
protected abstract void consume(long offset, ByteBuffer payload) throws Exception;
+ public void stop() {
+ this.stop = true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
index 2c6a86c..d337c4c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -42,6 +42,8 @@ public class Stream {
private long offset;
private byte[] rawData;
+ public static final Stream EOF = new Stream(-1, new byte[0]);
+
public Stream(long offset, byte[] rawData) {
this.offset = offset;
this.rawData = rawData;