You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/05/29 09:44:50 UTC
[2/4] incubator-kylin git commit: streaming cubing: fix demo
streaming cubing: fix demo
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/341cda00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/341cda00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/341cda00
Branch: refs/heads/0.8.0
Commit: 341cda005b5805ffa678f18fa85d354a2a4a1529
Parents: a6a9d94
Author: honma <ho...@ebay.com>
Authored: Thu May 28 23:06:12 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri May 29 15:44:25 2015 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/common/KylinConfig.java | 3 +--
.../java/org/apache/kylin/common/util/BasicTest.java | 1 +
.../apache/kylin/job/streaming/CubeStreamBuilder.java | 3 +--
.../apache/kylin/job/streaming/StreamingBootstrap.java | 13 ++++++-------
.../org/apache/kylin/storage/StorageEngineFactory.java | 2 +-
5 files changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cbb049d..b049fd0 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -19,7 +19,6 @@
package org.apache.kylin.common;
import com.google.common.collect.Sets;
-import jodd.util.StringUtil;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.IOUtils;
@@ -496,7 +495,7 @@ public class KylinConfig {
private String[] getOptionalStringArray(String prop) {
final String property = System.getProperty(prop);
- if (!StringUtil.isBlank(property))
+ if (!StringUtils.isBlank(property))
return property.split("\\s*,\\s*");
return kylinConfig.getStringArray(prop);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 98ee807..daab36f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -62,6 +62,7 @@ public class BasicTest {
@Test
public void test0() throws Exception {
+ System.out.println(Long.MAX_VALUE);
IdentityHashMap<String, Void> a = new IdentityHashMap<>();
IdentityHashMap<String, Void> b = new IdentityHashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 82892dc..0bd2792 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -9,7 +9,6 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -393,7 +392,7 @@ public class CubeStreamBuilder extends StreamBuilder {
@Override
protected int batchInterval() {
- return 30 * 60 * 1000;//30 min
+ return 5 * 60 * 1000;//30 min
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 3929098..0c67d78 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@@ -131,13 +130,14 @@ public class StreamingBootstrap {
private List<BlockingQueue<StreamMessage>> consume(KafkaConfig kafkaConfig, final int partitionCount) {
List<BlockingQueue<StreamMessage>> result = Lists.newArrayList();
- for (int partitionId = 0 ; partitionId < partitionCount && partitionId < 10; ++partitionId) {
+ for (int partitionId = 0; partitionId < partitionCount && partitionId < 3; ++partitionId) {
final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
- long streamingOffset = getEarliestStreamingOffset(kafkaConfig.getName(), 0, 0);
+
final long latestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(), partitionId, OffsetRequest.LatestTime(), leadBroker, kafkaConfig);
- streamingOffset = Math.max(streamingOffset, latestOffset);
- KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId,
- streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
+ long streamingOffset = latestOffset;
+ logger.info("submitting offset:" + streamingOffset);
+
+ KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, 1);
Executors.newSingleThreadExecutor().submit(consumer);
result.add(consumer.getStreamQueue(0));
}
@@ -210,7 +210,6 @@ public class StreamingBootstrap {
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
kafkaConsumers.put(getKey(kafkaConfig.getName(), partitionId), consumer);
-
Executors.newSingleThreadExecutor().submit(consumer);
final ExecutorService streamingBuilderPool = Executors.newFixedThreadPool(parallelism);
for (int i = startShard; i < endShard; ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/341cda00/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
index 0d720ab..5fc757a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
+++ b/storage/src/main/java/org/apache/kylin/storage/StorageEngineFactory.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
* @author xjiang
*/
public class StorageEngineFactory {
- private static boolean allowStorageLayerCache = true;
+ private static boolean allowStorageLayerCache = false;
public static IStorageEngine getStorageEngine(IRealization realization) {