You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/04 09:54:32 UTC
incubator-kylin git commit: fix hbase rowkey mapping
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 3614eef35 -> 503b958fa
fix hbase rowkey mapping
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/503b958f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/503b958f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/503b958f
Branch: refs/heads/0.8.0
Commit: 503b958fab33a17418b53a7e166c0d18c7c3ac70
Parents: 3614eef
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 15:54:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jun 4 15:54:43 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/job/streaming/CubeStreamConsumer.java | 6 ++----
.../apache/kylin/job/streaming/StreamingBootstrap.java | 13 ++++++++++++-
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/503b958f/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 00b9d14..bd7c6cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -216,12 +216,10 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
private Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, List<List<String>> recordList) throws IOException {
final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
- final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+ int index = 0;
for (TblColRef column : columnsNeedToBuildDictionary) {
- final int index = allDimensions.indexOf(column);
- Preconditions.checkArgument(index >= 0);
- tblColRefMap.put(index, column);
+ tblColRefMap.put(index++, column);
}
HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/503b958f/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index f45ec5f..f823e22 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,6 +34,7 @@
package org.apache.kylin.job.streaming;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,16 +47,19 @@ import org.apache.kylin.common.persistence.HBaseConnection;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Map;
@@ -167,7 +171,14 @@ public class StreamingBootstrap {
MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
- cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
+ cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig,
+ Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(IntermediateColumnDesc input) {
+ return input.getColRef();
+ }
+ })));
cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
future.get();