You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/20 11:28:10 UTC

[45/50] [abbrv] kylin git commit: KYLIN-2269 Reduce MR memory usage for global dict

KYLIN-2269 Reduce MR memory usage for global dict

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f50c0c87
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f50c0c87
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f50c0c87

Branch: refs/heads/master-cdh5.7
Commit: f50c0c87373e4abacd7106d527df4b885f0a88ea
Parents: 878107e
Author: kangkaisen <ka...@live.com>
Authored: Tue Dec 6 19:26:09 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Dec 19 17:52:41 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++++
 .../org/apache/kylin/cube/model/CubeDesc.java   | 22 ++++++++++++++++++++
 .../cube/model/CubeJoinedFlatTableDesc.java     | 15 ++++++++-----
 .../cube/model/CubeJoinedFlatTableEnrich.java   |  7 ++++++-
 .../org/apache/kylin/job/JoinedFlatTable.java   | 13 ++++++++++--
 .../metadata/model/IJoinedFlatTableDesc.java    | 11 ++++++----
 6 files changed, 60 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5153562..01d1d36 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -492,6 +492,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.source.hive.beeline-params", "");
     }
 
+    public String getFlatHiveTableClusterByDictColumn() {
+        return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
+    }
+
     @Deprecated
     public String getCreateFlatHiveTableMethod() {
         return getOptional("kylin.source.hive.create-flat-table-method", "1");

http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index f6b68af..3b8d034 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -1090,6 +1090,28 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
 
         return null;
     }
+    
+    /** Get a column which can be used to cluster the source table.
+     * To reduce memory footprint in base cuboid for global dict */
+    // TODO handle more than one ultra high cardinality columns use global dict in one cube
+    TblColRef getClusteredByColumn() {
+        if (getDistributedByColumn() != null) {
+            return null;
+        }
+
+        if (dictionaries == null) {
+            return null;
+        }
+
+        String clusterByColumn = config.getFlatHiveTableClusterByDictColumn();
+        for (DictionaryDesc dictDesc : dictionaries) {
+            if (dictDesc.getColumnRef().getName().equalsIgnoreCase(clusterByColumn)) {
+                return dictDesc.getColumnRef();
+            }
+        }
+
+        return null;
+    }
 
     public String getDictionaryBuilderClass(TblColRef col) {
         if (dictionaries == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index f37f86e..94e1a7c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -49,11 +49,11 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     public CubeJoinedFlatTableDesc(CubeDesc cubeDesc) {
         this(cubeDesc, null);
     }
-    
+
     public CubeJoinedFlatTableDesc(CubeSegment cubeSegment) {
         this(cubeSegment.getCubeDesc(), cubeSegment);
     }
-    
+
     private CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment /* can be null */) {
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
@@ -68,7 +68,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
             return "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getUuid().replaceAll("-", "_");
         }
     }
-    
+
     protected final void initAddColumn(TblColRef col) {
         if (columnIndexMap.containsKey(col))
             return;
@@ -77,10 +77,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         columnIndexMap.put(col, columnIndex);
         columnList.add(col);
         columnCount = columnIndexMap.size();
-        
+
         Preconditions.checkState(columnIndexMap.size() == columnList.size());
     }
-    
+
     // check what columns from hive tables are required, and index them
     protected void initParseCubeDesc() {
         for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived(false)) {
@@ -165,4 +165,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return cubeSegment;
     }
 
+    @Override
+    public TblColRef getClusterBy() {
+        return cubeDesc.getClusteredByColumn();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 979af76..a1312b5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -42,7 +42,7 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
         // != works due to object cache
         if (cubeDesc.getModel() != flatDesc.getDataModel())
             throw new IllegalArgumentException();
-        
+
         this.cubeDesc = cubeDesc;
         this.flatDesc = flatDesc;
         parseCubeDesc();
@@ -132,4 +132,9 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
         return flatDesc.getSegment();
     }
 
+    @Override
+    public TblColRef getClusterBy() {
+        return flatDesc.getClusterBy();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9fa0961..9ed563f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -171,6 +171,10 @@ public class JoinedFlatTable {
         }
     }
 
+    private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) {
+        sql.append(" CLUSTER BY ").append(colName(clusterCol)).append(";\n");
+    }
+
     private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) {
         boolean hasCondition = false;
         StringBuilder whereBuilder = new StringBuilder();
@@ -219,8 +223,13 @@ public class JoinedFlatTable {
         StringBuilder sql = new StringBuilder();
         sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
 
-        TblColRef distDcol = flatDesc.getDistributedBy();
-        appendDistributeStatement(sql, distDcol);
+        TblColRef clusterCol = flatDesc.getClusterBy();
+        if (clusterCol != null) {
+            appendClusterStatement(sql, clusterCol);
+        } else {
+            appendDistributeStatement(sql, flatDesc.getDistributedBy());
+        }
+
         return sql.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f50c0c87/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index ffa2680..b545e50 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -27,16 +27,19 @@ public interface IJoinedFlatTableDesc {
     String getTableName();
 
     DataModelDesc getDataModel();
-    
+
     List<TblColRef> getAllColumns();
-    
+
     int getColumnIndex(TblColRef colRef);
 
     long getSourceOffsetStart();
-    
+
     long getSourceOffsetEnd();
-    
+
     TblColRef getDistributedBy();
 
+    TblColRef getClusterBy();
+
     ISegment getSegment();
+
 }