You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:43 UTC

[06/50] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/9a1c4cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/9a1c4cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/9a1c4cb6

Branch: refs/heads/streaming-localdict
Commit: 9a1c4cb6b3dcb967ab017c23de76cb910a103cb9
Parents: 71324f4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Mar 26 17:49:39 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Mar 26 17:49:39 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/invertedindex/IIInstance.java  |  12 +++
 .../apache/kylin/invertedindex/IIManager.java   |  17 ++--
 .../invertedindex/index/BatchSliceBuilder.java  |   6 +-
 .../org/apache/kylin/streaming/KafkaConfig.java |  22 ++--
 .../apache/kylin/streaming/KafkaConsumer.java   |  17 ++--
 .../kylin/streaming/StreamingBootstrap.java     | 102 +++++++++++++++++++
 .../apache/kylin/streaming/StreamingCLI.java    |  70 +++++++++++++
 .../kylin/streaming/KafkaConsumerTest.java      |   2 +-
 8 files changed, 210 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 7684699..fd300e0 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -79,6 +80,9 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
     @JsonProperty("segments")
     private List<IISegment> segments = new ArrayList<IISegment>();
 
+    @JsonProperty("stream_offset")
+    private List<Long> streamOffsets = Lists.newArrayList();
+
     @JsonProperty("create_time_utc")
     private long createTimeUTC;
 
@@ -357,4 +361,12 @@ public class IIInstance extends RootPersistentEntity implements IRealization {
     public void setCost(int cost) {
         this.cost = cost;
     }
+
+    public List<Long> getStreamOffsets() {
+        return streamOffsets;
+    }
+
+    public void setStreamOffsets(List<Long> streamOffsets) {
+        this.streamOffsets = streamOffsets;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 6ebfbf8..b086d5d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -240,19 +240,18 @@ public class IIManager implements IRealizationProvider {
     }
 
     private String generateStorageLocation() {
-        String namePrefix = IRealizationConstants.IIHbaseStorageLocationPrefix;
-        String tableName = "";
-        do {
-            StringBuffer sb = new StringBuffer();
-            sb.append(namePrefix);
+        while (true) {
+            StringBuilder sb = new StringBuilder(IRealizationConstants.IIHbaseStorageLocationPrefix);
             for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
                 int idx = (int) (Math.random() * ALPHA_NUM.length());
                 sb.append(ALPHA_NUM.charAt(idx));
             }
-            tableName = sb.toString();
-        } while (this.usedStorageLocation.contains(tableName));
-
-        return tableName;
+            if (usedStorageLocation.contains(sb.toString())) {
+                continue;
+            } else {
+                return sb.toString();
+            }
+        }
     }
 
     private void loadAllIIInstance() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
index 94b70c1..6ba328c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
@@ -86,11 +86,7 @@ public class BatchSliceBuilder {
     }
 
     private long increaseSliceTimestamp(long timestamp) {
-        if (timestamp < sliceTimestamp) {
-            throw new IllegalStateException();
-        }
-
-        if (timestamp == sliceTimestamp) {
+        if (timestamp <= sliceTimestamp) {
             return ++timestamp; // ensure slice timestamp increases
         } else {
             return timestamp;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b22c7e0..5194e9d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -76,10 +76,8 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("bufferSize")
     private int bufferSize;
 
-    @JsonProperty("iiDesc")
-    private String iiDesc;
-
-    private int partitionId;
+    @JsonProperty("iiName")
+    private String iiName;
 
     public int getTimeout() {
         return timeout;
@@ -121,14 +119,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.topic = topic;
     }
 
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
     public void setBrokerConfigs(List<BrokerConfig> brokerConfigs) {
         this.brokerConfigs = brokerConfigs;
     }
@@ -143,6 +133,14 @@ public class KafkaConfig extends RootPersistentEntity {
         });
     }
 
+    public String getIiName() {
+        return iiName;
+    }
+
+    public void setIiName(String iiName) {
+        this.iiName = iiName;
+    }
+
     public String getName() {
         return name;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
index 42a0f1f..910041c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConsumer.java
@@ -58,15 +58,16 @@ public abstract class KafkaConsumer implements Runnable {
 
     private KafkaConfig kafkaConfig;
     private List<Broker> replicaBrokers;
-    private AtomicLong offset = new AtomicLong();
+    private long offset;
     private BlockingQueue<Stream> streamQueue;
 
     private Logger logger;
 
-    public KafkaConsumer(String topic, int partitionId, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
+    public KafkaConsumer(String topic, int partitionId, long startOffset, List<Broker> initialBrokers, KafkaConfig kafkaConfig) {
         this.topic = topic;
         this.partitionId = partitionId;
         this.kafkaConfig = kafkaConfig;
+        offset = startOffset;
         this.replicaBrokers = initialBrokers;
         logger = LoggerFactory.getLogger("KafkaConsumer_" + topic + "_" + partitionId);
         streamQueue = new ArrayBlockingQueue<Stream>(kafkaConfig.getMaxReadCount());
@@ -90,12 +91,6 @@ public abstract class KafkaConsumer implements Runnable {
     public void run() {
         try {
             Broker leadBroker = getLeadBroker();
-            if (leadBroker == null) {
-                logger.warn("cannot find lead broker");
-            } else {
-                final long lastOffset = KafkaRequester.getLastOffset(topic, partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-                offset.set(lastOffset);
-            }
             while (true) {
                 if (leadBroker == null) {
                     leadBroker = getLeadBroker();
@@ -105,9 +100,9 @@ public abstract class KafkaConsumer implements Runnable {
                     continue;
                 }
 
-                final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset.get(), leadBroker, kafkaConfig);
+                final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, kafkaConfig);
                 if (fetchResponse.errorCode(topic, partitionId) != 0) {
-                    logger.warn("fetch response offset:" + offset.get() + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+                    logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
                     continue;
                 }
                 for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
@@ -117,7 +112,7 @@ public abstract class KafkaConsumer implements Runnable {
                         logger.error("error put streamQueue", e);
                         break;
                     }
-                    offset.incrementAndGet();
+                    offset++;
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
new file mode 100644
index 0000000..4528a72
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingBootstrap.java
@@ -0,0 +1,102 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.PartitionMetadata;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.invertedindex.IIDescManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingBootstrap {
+
+    private static KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+    private static StreamManager streamManager = StreamManager.getInstance(kylinConfig);
+    private static IIManager iiManager = IIManager.getInstance(kylinConfig);
+    private static IIDescManager iiDescManager = IIDescManager.getInstance(kylinConfig);
+
+
+    private static Broker getLeadBroker(KafkaConfig kafkaConfig, int partitionId) {
+        final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaConfig.getTopic(), partitionId, kafkaConfig.getBrokers(), kafkaConfig);
+        if (partitionMetadata != null && partitionMetadata.errorCode() == 0) {
+            return partitionMetadata.leader();
+        } else {
+            return null;
+        }
+    }
+
+    public static void startStreaming(String streamingConf, int partitionId) throws Exception {
+        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streamingConf);
+        Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streamingConf);
+        final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
+        Preconditions.checkNotNull(ii);
+
+        final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
+        Preconditions.checkState(leadBroker != null, "cannot find lead broker");
+        final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
+        long streamOffset = ii.getStreamOffsets().get(partitionId);
+        if (streamOffset < earliestOffset) {
+            streamOffset = earliestOffset;
+        }
+
+
+        KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), 0, streamOffset, kafkaConfig.getBrokers(), kafkaConfig) {
+            @Override
+            protected void consume(long offset, ByteBuffer payload) throws Exception {
+                byte[] bytes = new byte[payload.limit()];
+                payload.get(bytes);
+                getStreamQueue().put(new Stream(offset, bytes));
+            }
+        };
+        final IIDesc desc = ii.getDescriptor();
+        Executors.newSingleThreadExecutor().execute(consumer);
+        while (true) {
+            final Stream stream = consumer.getStreamQueue().poll();
+            if (stream != null) {
+                System.out.println("offset:" + stream.getOffset() + " content:" + new String(stream.getRawData()));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
new file mode 100644
index 0000000..70290f1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingCLI.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by qianzhou on 3/26/15.
+ */
+public class StreamingCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingCLI.class);
+
+    public static void main(String[] args) {
+        try {
+            if (args.length < 2) {
+                printArgsError(args);
+                return;
+            }
+            if (args[0].equals("start")) {
+                String kafkaConfName = args[1];
+                StreamingBootstrap.startStreaming(kafkaConfName, 0);
+            } else if (args.equals("stop")) {
+
+            } else {
+                printArgsError(args);
+            }
+        } catch (Exception e) {
+        }
+    }
+
+    private static void printArgsError(String[] args) {
+        logger.warn("invalid args:" + StringUtils.join(args, " "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/9a1c4cb6/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
index c824c48..337dfc7 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -90,7 +90,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
         final ExecutorService executorService = Executors.newFixedThreadPool(kafkaTopicMeta.getPartitionIds().size());
         List<BlockingQueue<Stream>> queues = Lists.newArrayList();
         for (Integer partitionId : kafkaTopicMeta.getPartitionIds()) {
-            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, kafkaConfig.getBrokers(), kafkaConfig) {
+            KafkaConsumer consumer = new KafkaConsumer(kafkaTopicMeta.getName(), partitionId, 0, kafkaConfig.getBrokers(), kafkaConfig) {
                 @Override
                 protected void consume(long offset, ByteBuffer payload) throws Exception {
                     //TODO use ByteBuffer maybe