You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/27 03:32:45 UTC

[42/50] [abbrv] kylin git commit: KYLIN-1476 Support measure dictionary in 1.x

KYLIN-1476 Support measure dictionary in 1.x


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d517530c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d517530c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d517530c

Branch: refs/heads/1.3.x
Commit: d517530c93dc3176a52295af297afdfefdfc4b62
Parents: 438a6ad
Author: wangxiaoyu <ro...@gmail.com>
Authored: Tue Mar 8 23:39:51 2016 +0800
Committer: Xiaoyu Wang <wa...@apache.org>
Committed: Mon Mar 21 20:00:57 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 26 ++++++++++++-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  | 14 +++----
 .../kylin/job/cube/MergeDictionaryStep.java     | 17 ++++-----
 .../hadoop/cube/FactDistinctColumnsMapper.java  | 40 +++++---------------
 .../hadoop/cube/FactDistinctColumnsReducer.java | 12 ++----
 .../hadoop/hive/CubeJoinedFlatTableDesc.java    | 18 +++++++--
 6 files changed, 65 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 88ce4f1..4615762 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -194,11 +194,11 @@ public class CubeManager implements IRealizationProvider {
 
     public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String factColumnsPath) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-        if (!cubeDesc.getRowkey().isUseDictionary(col))
+        if (!cubeDesc.getAllColumnsNeedDictionary().contains(col))
             return null;
 
         DictionaryManager dictMgr = getDictionaryManager();
-        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
+        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), "true", col, factColumnsPath);
 
         if (dictInfo != null) {
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -791,4 +791,26 @@ public class CubeManager implements IRealizationProvider {
         return getCube(name);
     }
 
+
+    /**
+     * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk)
+     * @param cubeDesc
+     * @return
+     * @throws IOException
+     */
+    public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> dictionaryColumns = cubeDesc.getAllColumnsNeedDictionary();
+
+        List<TblColRef> factDictCols = new ArrayList<TblColRef>();
+        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+        for (int i = 0; i < dictionaryColumns.size(); i++) {
+            TblColRef col = dictionaryColumns.get(i);
+
+            String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0];
+            if (cubeDesc.getModel().isFactTable(scanTable)) {
+                factDictCols.add(col);
+            }
+        }
+        return factDictCols;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 3cdaa93..bcbd496 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -44,15 +44,13 @@ public class DictionaryGeneratorCLI {
     private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(config);
 
-        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    logger.info("Building dictionary for " + col);
-                    cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
-                }
-            }
+        // dictionary
+        for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionary()) {
+            logger.info("Building dictionary for " + col);
+            cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+        }
 
+        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
             // build snapshot
             if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
                 // CubeSegment seg = cube.getTheOnlySegment();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
index 8f77558..47151a7 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
@@ -107,16 +107,13 @@ public class MergeDictionaryStep extends AbstractExecutable {
         DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
 
         CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
+
+        for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+            String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0];
+            if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+                colsNeedMeringDict.add(col);
+            } else {
+                colsNeedCopyDict.add(col);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
index 72802aa..b70113d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,10 +33,7 @@ import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
@@ -51,7 +47,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
     private String cubeName;
     private CubeInstance cube;
     private CubeDesc cubeDesc;
-    private int[] factDictCols;
+    List<TblColRef> factDictCols;
 
     private CubeJoinedFlatTableDesc intermediateTableDesc;
 
@@ -60,6 +56,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
     private int errorRecordCounter;
 
     private HCatSchema schema = null;
+    protected int[] dictionaryColumnIndex;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -73,40 +70,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
         cubeDesc = cube.getDescriptor();
         intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
 
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        List<TblColRef> columns = baseCuboid.getColumns();
-
-        ArrayList<Integer> factDictCols = new ArrayList<Integer>();
-        RowKeyDesc rowkey = cubeDesc.getRowkey();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        for (int i = 0; i < columns.size(); i++) {
-            TblColRef col = columns.get(i);
-            if (rowkey.isUseDictionary(col) == false)
-                continue;
-
-            String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-            if (cubeDesc.getModel().isFactTable(scanTable)) {
-                factDictCols.add(i);
-            }
+        factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+        dictionaryColumnIndex = new int[factDictCols.size()];
+        for (int i = 0; i < factDictCols.size(); i++) {
+            TblColRef colRef = factDictCols.get(i);
+            int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
         }
-        this.factDictCols = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); i++)
-            this.factDictCols[i] = factDictCols.get(i);
-
         schema = HCatInputFormat.getTableSchema(context.getConfiguration());
     }
 
     @Override
     public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
         try {
-
-            int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
             HCatFieldSchema fieldSchema = null;
-            for (int i : factDictCols) {
+            for (int i = 0; i < factDictCols.size(); i++) {
                 outputKey.set((short) i);
-                fieldSchema = schema.get(flatTableIndexes[i]);
+                fieldSchema = schema.get(dictionaryColumnIndex[i]);
                 Object fieldValue = record.get(fieldSchema.getName(), schema);
                 if (fieldValue == null)
                     continue;

http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 89f90ba..809ced2 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -31,14 +31,12 @@ import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 
@@ -47,8 +45,7 @@ import java.util.List;
  */
 public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
 
-    private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
+    private List<TblColRef> factDictCols;
     @Override
     protected void setup(Context context) throws IOException {
         super.publishConfiguration(context.getConfiguration());
@@ -58,15 +55,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeDesc cubeDesc = cube.getDescriptor();
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        columnList = baseCuboid.getColumns();
+        factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
     }
 
     @Override
     public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        TblColRef col = columnList.get(key.get());
+        TblColRef col = factDictCols.get(key.get());
 
         HashSet<ByteArray> set = new HashSet<ByteArray>();
         for (Text textValue : values) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
index b2bed9f..72dd400 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
@@ -48,9 +48,12 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
 
     private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
 
+    private Map<String, Integer> columnIndexMap;
+
     public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
+        this.columnIndexMap = Maps.newHashMap();
         parseCubeDesc();
     }
 
@@ -73,10 +76,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
             this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
         }
 
-        Map<String, Integer> dimensionIndexMap = Maps.newHashMap();
         int columnIndex = 0;
         for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
-            dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex);
+            columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
             columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
             columnIndex++;
         }
@@ -86,7 +88,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         rowKeyColumnIndexes = new int[rowkeyColCount];
         for (int i = 0; i < rowkeyColCount; i++) {
             String colName = colName(cuboidColumns.get(i).getCanonicalName());
-            Integer dimIdx = dimensionIndexMap.get(colName);
+            Integer dimIdx = columnIndexMap.get(colName);
             if (dimIdx == null) {
                 throw new RuntimeException("Can't find column " + colName);
             }
@@ -108,6 +110,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
                     measureColumnIndexes[i][j] = contains(columnList, c);
                     if (measureColumnIndexes[i][j] < 0) {
                         measureColumnIndexes[i][j] = columnIndex;
+                        columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
                         columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
                         columnIndex++;
                     }
@@ -172,4 +175,13 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     private static String colName(String canonicalColName) {
         return canonicalColName.replace(".", "_");
     }
+
+    public int getColumnIndex(TblColRef colRef) {
+        String key = colName(colRef.getCanonicalName());
+        Integer index = columnIndexMap.get(key);
+        if (index == null)
+            throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+
+        return index.intValue();
+    }
 }