You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/04 09:54:32 UTC

incubator-kylin git commit: fix hbase rowkey mapping

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 3614eef35 -> 503b958fa


fix hbase rowkey mapping


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/503b958f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/503b958f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/503b958f

Branch: refs/heads/0.8.0
Commit: 503b958fab33a17418b53a7e166c0d18c7c3ac70
Parents: 3614eef
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jun 4 15:54:14 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jun 4 15:54:43 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/streaming/CubeStreamConsumer.java |  6 ++----
 .../apache/kylin/job/streaming/StreamingBootstrap.java | 13 ++++++++++++-
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/503b958f/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 00b9d14..bd7c6cb 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -216,12 +216,10 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
 
     private Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, List<List<String>> recordList) throws IOException {
         final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
-        final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+        int index = 0;
         for (TblColRef column : columnsNeedToBuildDictionary) {
-            final int index = allDimensions.indexOf(column);
-            Preconditions.checkArgument(index >= 0);
-            tblColRefMap.put(index, column);
+            tblColRefMap.put(index++, column);
         }
 
         HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/503b958f/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index f45ec5f..f823e22 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.job.streaming;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,16 +47,19 @@ import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Map;
@@ -167,7 +171,14 @@ public class StreamingBootstrap {
         MicroBatchCondition condition = new MicroBatchCondition(Integer.MAX_VALUE, batchInterval);
         long startTimestamp = cubeInstance.getDateRangeEnd() == 0 ? TimeUtil.getNextPeriodStart(System.currentTimeMillis(), (long) batchInterval) : cubeInstance.getDateRangeEnd();
         StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, condition, new CubeStreamConsumer(cubeName), startTimestamp);
-        cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
+        cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig,
+                Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+                    @Nullable
+                    @Override
+                    public TblColRef apply(IntermediateColumnDesc input) {
+                        return input.getColRef();
+                    }
+                })));
         cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         future.get();