You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:12 UTC

[35/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b979dfae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b979dfae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b979dfae

Branch: refs/heads/streaming-localdict
Commit: b979dfaea999b548e089a0e41d38e164e7b46662
Parents: dee2955
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 15:35:57 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 15:35:57 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java    | 17 +++++++++++------
 .../apache/kylin/job/streaming/StreamingCLI.java   |  2 --
 .../org/apache/kylin/streaming/KafkaConsumer.java  | 10 ++++------
 3 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 65b23c4..5d1673c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -66,7 +66,14 @@ public class StreamingBootstrap {
     private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
 
     public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
-        return new StreamingBootstrap(kylinConfig);
+        final StreamingBootstrap bootstrap = new StreamingBootstrap(kylinConfig);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                bootstrap.stop();
+            }
+        }));
+        return bootstrap;
     }
 
     private StreamingBootstrap(KylinConfig kylinConfig) {
@@ -84,11 +91,9 @@ public class StreamingBootstrap {
         }
     }
 
-    public void stop(String streaming, int partitionId) throws Exception {
-        final KafkaConsumer consumer = kafkaConsumers.remove(getKey(streaming, partitionId));
-        if (consumer != null) {
+    public void stop() {
+        for (KafkaConsumer consumer : kafkaConsumers.values()) {
             consumer.stop();
-            consumer.getStreamQueue().put(Stream.EOF);
         }
     }
 
@@ -125,7 +130,7 @@ public class StreamingBootstrap {
         task.setStreamParser(JsonStreamParser.instance);
 
         Executors.newSingleThreadExecutor().submit(consumer);
-        Executors.newSingleThreadExecutor().submit(task);
+        Executors.newSingleThreadExecutor().submit(task).get();
     }
 
     private String getKey(String streaming, int partitionId) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 4977339..219ca41 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -55,8 +55,6 @@ public class StreamingCLI {
             if (args[0].equals("start")) {
                 String kafkaConfName = args[1];
                 StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, 0);
-            } else if (args.equals("stop")) {
-
             } else {
                 printArgsError(args);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b979dfae/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index b083dea..868673d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -34,7 +34,6 @@
 
 package org.apache.kylin.streaming;
 
-import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
@@ -44,10 +43,8 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Created by qianzhou on 2/15/15.
@@ -64,7 +61,7 @@ public abstract class KafkaConsumer implements Runnable {
 
     private Logger logger;
 
-    private volatile boolean stop = false;
+    private volatile boolean isRunning = true;
 
     public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
         this.topic = topic;
@@ -94,7 +91,7 @@ public abstract class KafkaConsumer implements Runnable {
     public void run() {
         try {
             Broker leadBroker = getLeadBroker();
-            while (!stop) {
+            while (isRunning) {
                 if (leadBroker == null) {
                     leadBroker = getLeadBroker();
                 }
@@ -118,6 +115,7 @@ public abstract class KafkaConsumer implements Runnable {
                     offset++;
                 }
             }
+            getStreamQueue().put(Stream.EOF);
         } catch (Exception e) {
             logger.error("consumer has encountered an error", e);
         }
@@ -126,7 +124,7 @@ public abstract class KafkaConsumer implements Runnable {
     protected abstract void consume(long offset, ByteBuffer payload) throws Exception;
 
     public void stop() {
-        this.stop = true;
+        this.isRunning = false;
     }
 
 }