You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/12 09:05:48 UTC

incubator-kylin git commit: KYLIN-1137 fix bug in MergeCuboidMapper

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1126 03c631095 -> 0cead28c3


KYLIN-1137 fix bug in MergeCuboidMapper

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0cead28c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0cead28c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0cead28c

Branch: refs/heads/KYLIN-1126
Commit: 0cead28c397accccff1d564b1a48109c35a02e4d
Parents: 03c6310
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 12 16:05:05 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 12 16:05:05 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      |  6 +-
 .../engine/mr/steps/MergeCuboidMapper.java      | 73 ++++++++++----------
 2 files changed, 41 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0cead28c/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index e1f12da..04cb01d 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -62,7 +62,7 @@ public class BuildCubeWithEngineTest {
     private CubeManager cubeManager;
     private DefaultScheduler scheduler;
     protected ExecutableManager jobService;
-    private static boolean fastBuildMode = true;
+    private static boolean fastBuildMode = false;
 
     private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class);
 
@@ -87,8 +87,8 @@ public class BuildCubeWithEngineTest {
         ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath());
 
         String fastModeStr = System.getProperty("fastBuildMode");
-        if (fastModeStr != null && fastModeStr.equalsIgnoreCase("false")) {
-            fastBuildMode = false;
+        if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) {
+            fastBuildMode = true;
             logger.info("Will not use fast build mode");
         } else {
             logger.info("Will use fast build mode");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0cead28c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index d3d0d7c..ac46b99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -18,14 +18,8 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
@@ -48,12 +42,19 @@ import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * @author ysong1, honma
  */
@@ -122,7 +123,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-        
+
         if (cubeDesc.hasMeasureUsingDictionary()) {
             measuresDescs = cubeDesc.getMeasures();
             codec = new MeasureCodec(measuresDescs);
@@ -230,7 +231,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        
         // encode measure if it uses dictionary 
         if (cubeDesc.hasMeasureUsingDictionary()) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
@@ -241,33 +241,36 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
                 MeasureDesc measureDesc = measuresDescs.get(idx);
                 String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-                TblColRef col = cubeDesc.findColumnRef(cubeDesc.getFactTable(), displayCol);
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                int topNSize = topNCounters.size();
-                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length  - bufOffset || //
-                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length  - bufOffset || //
-                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
-                }
-
-                for (Counter<ByteArray> c : topNCounters) {
-                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                    int idInMergedDict;
-                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                    if (size < 0) {
-                        idInMergedDict = mergedDict.nullId();
-                    } else {
-                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
+                if (StringUtils.isNotEmpty(displayCol)) {
+                    ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
+                    TblColRef colRef = new TblColRef(sourceColumn);
+                    DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+                    Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+                    Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+                    int topNSize = topNCounters.size();
+                    while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                            mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                            mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                        byte[] oldBuf = newKeyBodyBuf;
+                        newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                        System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
                     }
 
-                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()));
-                    bufOffset += mergedDict.getSizeOfId();
+                    for (Counter<ByteArray> c : topNCounters) {
+                        int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                        int idInMergedDict;
+                        int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                        if (size < 0) {
+                            idInMergedDict = mergedDict.nullId();
+                        } else {
+                            idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
+                        }
+
+                        BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                        c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()));
+                        bufOffset += mergedDict.getSizeOfId();
+                    }
                 }
             }