You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:58 UTC

[21/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b6b3388c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b6b3388c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b6b3388c

Branch: refs/heads/streaming-localdict
Commit: b6b3388ce2239fe36f60f8aad2349081813b10f7
Parents: 7088724
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 27 11:57:27 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 27 11:57:27 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 35 +++++++++++++++-----
 .../kylin/job/streaming/StreamingCLI.java       |  2 +-
 .../apache/kylin/job/BuildIIWithStreamTest.java | 26 +++++----------
 .../apache/kylin/streaming/KafkaConsumer.java   |  8 ++++-
 .../java/org/apache/kylin/streaming/Stream.java |  2 ++
 5 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ddaae29..65b23c4 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -35,6 +35,7 @@
 package org.apache.kylin.job.streaming;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import kafka.api.OffsetRequest;
 import kafka.cluster.Broker;
 import kafka.javaapi.PartitionMetadata;
@@ -50,8 +51,8 @@ import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.Map;
+import java.util.concurrent.*;
 
 /**
  * Created by qianzhou on 3/26/15.
@@ -62,6 +63,8 @@ public class StreamingBootstrap {
     private StreamManager streamManager;
     private IIManager iiManager;
 
+    private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
+
     public static StreamingBootstrap getInstance(KylinConfig kylinConfig) {
         return new StreamingBootstrap(kylinConfig);
     }
@@ -81,9 +84,17 @@ public class StreamingBootstrap {
         }
     }
 
-    public void startStreaming(String streamingConf, int partitionId) throws Exception {
-        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
-        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+    public void stop(String streaming, int partitionId) throws Exception {
+        final KafkaConsumer consumer = kafkaConsumers.remove(getKey(streaming, partitionId));
+        if (consumer != null) {
+            consumer.stop();
+            consumer.getStreamQueue().put(Stream.EOF);
+        }
+    }
+
+    public void start(String streaming, int partitionId) throws Exception {
+        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streaming);
+        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
         final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii);
         Preconditions.checkArgument(ii.getSegments().size() > 0);
@@ -96,7 +107,8 @@ public class StreamingBootstrap {
         if (streamOffset < earliestOffset) {
             streamOffset = earliestOffset;
         }
-
+        String[] args = new String[]{"-iiname", kafkaConfig.getIiName(), "-htablename", iiSegment.getStorageLocationIdentifier()};
+        ToolRunner.run(new IICreateHTableJob(), args);
 
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
             @Override
@@ -107,11 +119,16 @@ public class StreamingBootstrap {
             }
         };
         final IIDesc desc = ii.getDescriptor();
+        kafkaConsumers.put(getKey(streaming, partitionId), consumer);
 
-        Executors.newSingleThreadExecutor().submit(consumer);
         final IIStreamBuilder task = new IIStreamBuilder(consumer.getStreamQueue(), iiSegment.getStorageLocationIdentifier(), desc, partitionId);
         task.setStreamParser(JsonStreamParser.instance);
-        final Future<?> future = Executors.newSingleThreadExecutor().submit(task);
-        future.get();
+
+        Executors.newSingleThreadExecutor().submit(consumer);
+        Executors.newSingleThreadExecutor().submit(task);
+    }
+
+    private String getKey(String streaming, int partitionId) {
+        return streaming + "_" + partitionId;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 8813cb3..4977339 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -54,7 +54,7 @@ public class StreamingCLI {
             }
             if (args[0].equals("start")) {
                 String kafkaConfName = args[1];
-                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).startStreaming(kafkaConfName, 0);
+                StreamingBootstrap.getInstance(KylinConfig.getInstanceFromEnv()).start(kafkaConfName, 0);
             } else if (args.equals("stop")) {
 
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 04a53f7..dae2d03 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,7 +34,6 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
@@ -59,14 +58,19 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -111,7 +115,7 @@ public class BuildIIWithStreamTest {
     }
 
     @AfterClass
-    public static void after() throws Exception {
+    public static void afterClass() throws Exception {
         backup();
     }
 
@@ -213,22 +217,10 @@ public class BuildIIWithStreamTest {
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         final IIStreamBuilder streamBuilder = new IIStreamBuilder(queue, segment.getStorageLocationIdentifier(), desc, 0);
         int count = 0;
-        List<String[]> rawData = Lists.newArrayList();
         while (reader.next()) {
-            desc.getTimestampColumn();
-            rawData.add(reader.getRow());
+            queue.put(parse(reader.getRow()));
             count++;
         }
-        final int timestampColumn = desc.getTimestampColumn();
-        Collections.sort(rawData, new Comparator<String[]>() {
-            @Override
-            public int compare(String[] o1, String[] o2) {
-                return o1[timestampColumn].compareTo(o2[timestampColumn]);
-            }
-        });
-        for (String[] row : rawData) {
-            queue.put(parse(row));
-        }
         logger.info("total record count:" + count + " htable:" + segment.getStorageLocationIdentifier());
         queue.put(new Stream(-1, null));
         final Future<?> future = executorService.submit(streamBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 18c8403..b083dea 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -64,6 +64,8 @@ public abstract class KafkaConsumer implements Runnable {
 
     private Logger logger;
 
+    private volatile boolean stop = false;
+
     public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
         this.topic = topic;
         this.partitionId = partitionId;
@@ -92,7 +94,7 @@ public abstract class KafkaConsumer implements Runnable {
     public void run() {
         try {
             Broker leadBroker = getLeadBroker();
-            while (true) {
+            while (!stop) {
                 if (leadBroker == null) {
                     leadBroker = getLeadBroker();
                 }
@@ -123,4 +125,8 @@ public abstract class KafkaConsumer implements Runnable {
 
     protected abstract void consume(long offset, ByteBuffer payload) throws Exception;
 
+    public void stop() {
+        this.stop = true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b6b3388c/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
index 2c6a86c..d337c4c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/Stream.java
@@ -42,6 +42,8 @@ public class Stream {
     private long offset;
     private byte[] rawData;
 
+    public static final Stream EOF = new Stream(-1, new byte[0]);
+
     public Stream(long offset, byte[] rawData) {
         this.offset = offset;
         this.rawData = rawData;