You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:25 UTC
[13/50] [abbrv] incubator-kylin git commit: KYLIN-749 bug fix
KYLIN-749 bug fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/29da9ece
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/29da9ece
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/29da9ece
Branch: refs/heads/streaming-localdict
Commit: 29da9ece5b55c21a1f880cb66cfff0348b529c3f
Parents: c8bc428
Author: honma <ho...@ebay.com>
Authored: Tue May 5 18:00:15 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed May 6 10:16:48 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 19 ++++++++-----------
.../java/org/apache/kylin/job/DataGenTest.java | 4 +++-
2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29da9ece/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 1688fc2..1f7608b 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,19 +34,13 @@
package org.apache.kylin.job.streaming;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.javaapi.PartitionMetadata;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -56,8 +50,11 @@ import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Created by qianzhou on 3/26/15.
@@ -140,7 +137,7 @@ public class StreamingBootstrap {
streamingOffset = Math.max(streamingOffset, earliestOffset);
logger.info("starting offset is " + streamingOffset);
- IICreateHTableJob.main(new String[] { "-iiname", "nous_ii", "-htablename", "KYLIN_2SKJ8JNOUS" });
+ IICreateHTableJob.main(new String[] { "-iiname", kafkaConfig.getIiName(), "-htablename", iiSegment.getStorageLocationIdentifier() });
KafkaConsumer consumer = new KafkaConsumer(kafkaConfig.getTopic(), partitionId, streamingOffset, kafkaConfig.getBrokers(), kafkaConfig, parallelism);
kafkaConsumers.put(getKey(streaming, partitionId), consumer);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/29da9ece/job/src/test/java/org/apache/kylin/job/DataGenTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DataGenTest.java b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
index 4e3cf17..7b10a22 100644
--- a/job/src/test/java/org/apache/kylin/job/DataGenTest.java
+++ b/job/src/test/java/org/apache/kylin/job/DataGenTest.java
@@ -72,7 +72,7 @@ public class DataGenTest extends LocalFileMetadataTestCase {
int totalCount = 10000;
int counter = 0;
- Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-01-06"), totalCount);
+ Iterator<String> iterator = StreamingDataGenerator.generate(DateFormat.stringToMillis("2015-01-03"), DateFormat.stringToMillis("2015-02-05"), totalCount);
iterator = SortUtil.extractAndSort(iterator, new Function<String, Comparable>() {
public Comparable apply(String input) {
@@ -80,6 +80,8 @@ public class DataGenTest extends LocalFileMetadataTestCase {
}
});
+ //FileUtils.writeLines(new File("//Users/honma/streaming_table_data"),Lists.newArrayList(iterator));
+
long lastTs = 0;
while (iterator.hasNext()) {
counter++;