You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:12 UTC
[35/50] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b979dfae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b979dfae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b979dfae
Branch: refs/heads/streaming-localdict
Commit: b979dfaea999b548e089a0e41d38e164e7b46662
Parents: dee2955
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 15:35:57 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 15:35:57 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 17 +++++++++++------
.../apache/kylin/job/streaming/StreamingCLI.java | 2 --
.../org/apache/kylin/streaming/KafkaConsumer.java | 10 ++++------
3 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 65b23c4..5d1673c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -66,7 +66,14 @@ public class StreamingBootstrap {
private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
- return new StreamingBootstrap(kylinConfig);
+ final StreamingBootstrap bootstrap = new StreamingBootstrap(kylinConfig);
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ bootstrap.stop();
+ }
+ }));
+ return bootstrap;
}
private StreamingBootstrap(KylinConfig kylinConfig) {
@@ -84,11 +91,9 @@ public class StreamingBootstrap {
}
}
- public void stop(String streaming, int partitionId) throws Exception {
- final KafkaConsumer consumer = kafkaConsumers.remove(getKey(streaming, partitionId));
- if (consumer != null) {
+ public void stop() {
+ for (KafkaConsumer consumer : kafkaConsumers.values()) {
consumer.stop();
- consumer.getStreamQueue().put(Stream.EOF);
}
}
@@ -125,7 +130,7 @@ public class StreamingBootstrap {
task.setStreamParser(JsonStreamParser.instance);
Executors.newSingleThreadExecutor().submit(consumer);
- Executors.newSingleThreadExecutor().submit(task);
+ Executors.newSingleThreadExecutor().submit(task).get();
}
private String getKey(String streaming, int partitionId) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 4977339..219ca41 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -55,8 +55,6 @@ public class StreamingCLI {
if (args[0].equals("start")) {
String kafkaConfName = args[1];
StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, 0);
- } else if (args.equals("stop")) {
-
} else {
printArgsError(args);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index b083dea..868673d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -34,7 +34,6 @@
package org.apache.kylin.streaming;
-import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
@@ -44,10 +43,8 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
/**
* Created by qianzhou on 2/15/15.
@@ -64,7 +61,7 @@ public abstract class KafkaConsumer implements Runnable {
private Logger logger;
- private volatile boolean stop = false;
+ private volatile boolean isRunning = true;
public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
this.topic = topic;
@@ -94,7 +91,7 @@ public abstract class KafkaConsumer implements Runnable {
public void run() {
try {
Broker leadBroker = getLeadBroker();
- while (!stop) {
+ while (isRunning) {
if (leadBroker == null) {
leadBroker = getLeadBroker();
}
@@ -118,6 +115,7 @@ public abstract class KafkaConsumer implements Runnable {
offset++;
}
}
+ getStreamQueue().put(Stream.EOF);
} catch (Exception e) {
logger.error("consumer has encountered an error", e);
}
@@ -126,7 +124,7 @@ public abstract class KafkaConsumer implements Runnable {
protected abstract void consume(long offset, ByteBuffer payload) throws Exception;
public void stop() {
- this.stop = true;
+ this.isRunning = false;
}
}