You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/29 09:44:50 UTC

[2/4] incubator-kylin git commit: streaming cubing: fix demo

streaming cubing: fix demo


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/341cda00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/341cda00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/341cda00

Branch: refs/heads/0.8.0
Commit: 341cda005b5805ffa678f18fa85d354a2a4a1529
Parents: a6a9d94
Author: honma <ho...@ebay.com>
Authored: Thu May 28 23:06:12 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/common/KylinConfig.java |  3 +--
 .../java/org/apache/kylin/common/util/BasicTest.java   |  1 +
 .../apache/kylin/job/streaming/CubeStreamBuilder.java  |  3 +--
 .../apache/kylin/job/streaming/StreamingBootstrap.java | 13 ++++++-------
 .../org/apache/kylin/storage/StorageEngineFactory.java |  2 +-
 5 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cbb049d..b049fd0 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.common;
 
 import com.google.common.collect.Sets;
-import jodd.util.StringUtil;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.IOUtils;
@@ -496,7 +495,7 @@ public class KylinConfig {
 
     private String[] getOptionalStringArray(String prop) {
         final String property = System.getProperty(prop);
-        if (!StringUtil.isBlank(property))
+        if (!StringUtils.isBlank(property))
             return property.split("\\s*,\\s*");
 
         return kylinConfig.getStringArray(prop);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 98ee807..daab36f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -62,6 +62,7 @@ public class BasicTest {
 
     @Test
     public void test0() throws Exception {
+        System.out.println(Long.MAX_VALUE);
 
         IdentityHashMap<String, Void> a = new IdentityHashMap<>();
         IdentityHashMap<String, Void> b = new IdentityHashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 82892dc..0bd2792 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -9,7 +9,6 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -393,7 +392,7 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     @Override
     protected int batchInterval() {
-        return 30 * 60 * 1000;//30 min
+        return 5 * 60 * 1000;//30 min
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 3929098..0c67d78 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
@@ -131,13 +130,14 @@ public class StreamingBootstrap {
 
     private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
         List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
-        for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+        for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
             final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
-            long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+
             final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
-            streamingOffset = Math.max(streamingOffset, latestOffset);
-            KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
-                    streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+            long streamingOffset = latestOffset;
+            logger.info("submitting offset:" + streamingOffset);
+
+            KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
             Executors.newSingleThreadExecutor().submit(consumer);
             result.add(consumer.getStreamQueue(0));
         }
@@ -210,7 +210,6 @@ public class StreamingBootstrap {
         KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
         kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
 
-
         Executors.newSingleThreadExecutor().submit(consumer);
         final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
         for (int i = startShard; i < endShard; ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 0d720ab..5fc757a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
  * @author xjiang
  */
 public class StorageEngineFactory {
-    private static boolean allowStorageLayerCache = true;
+    private static boolean allowStorageLayerCache = false;
 
     public static IStorageEngine getStorageEngine(IRealization realization) {