You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/19 07:45:58 UTC

[kylin] branch master updated: KYLIN-3457 Distribute by multi column if not set distribute column

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 95d2a5b  KYLIN-3457 Distribute by multi column if not set distribute column
95d2a5b is described below

commit 95d2a5befaa596b2d502026d53a791fa31d20bbe
Author: chao long <wa...@qq.com>
AuthorDate: Thu Jul 19 11:37:12 2018 +0800

    KYLIN-3457 Distribute by multi column if not set distribute column
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +++
 .../java/org/apache/kylin/job/JoinedFlatTable.java | 36 +++++++++++++++++-----
 .../apache/kylin/source/hive/HiveInputBase.java    |  5 +--
 .../org/apache/kylin/source/hive/HiveMRInput.java  | 11 +++----
 .../apache/kylin/source/hive/HiveSparkInput.java   | 11 +++----
 5 files changed, 45 insertions(+), 22 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 637502e..b2331e1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -788,6 +788,10 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
     }
 
+    public int getHiveRedistributeColumnCount() {
+        return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
+    }
+
     public int getDefaultVarcharPrecision() {
         int v = Integer.parseInt(getOptional("kylin.source.hive.default-varchar-precision", "256"));
         if (v < 1) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index a6c6daa..392323e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -188,8 +191,13 @@ public class JoinedFlatTable {
         }
     }
 
-    private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
-        sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
+    private static void appendDistributeStatement(StringBuilder sql, List<TblColRef> redistCols) {
+        sql.append(" DISTRIBUTE BY ");
+        for (TblColRef redistCol : redistCols) {
+            sql.append(colName(redistCol, true)).append(",");
+        }
+        sql.deleteCharAt(sql.length() - 1);
+        sql.append(";\n");
     }
 
     private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) {
@@ -252,16 +260,30 @@ public class JoinedFlatTable {
         return hiveDataType;
     }
 
-    public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
+    public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         final String tableName = flatDesc.getTableName();
         StringBuilder sql = new StringBuilder();
         sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
 
-        TblColRef clusterCol = flatDesc.getClusterBy();
-        if (clusterCol != null) {
-            appendClusterStatement(sql, clusterCol);
+        if (flatDesc.getClusterBy() != null) {
+            appendClusterStatement(sql, flatDesc.getClusterBy());
+        } else if (flatDesc.getDistributedBy() != null) {
+            appendDistributeStatement(sql, Lists.newArrayList(flatDesc.getDistributedBy()));
         } else {
-            appendDistributeStatement(sql, flatDesc.getDistributedBy());
+            int redistColumnCount = KylinConfig.getInstanceFromEnv().getHiveRedistributeColumnCount();
+
+            RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns();
+
+            if (rowKeyColDescs.length < redistColumnCount)
+                redistColumnCount = rowKeyColDescs.length;
+
+            List<TblColRef> redistColumns = Lists.newArrayListWithCapacity(redistColumnCount);
+
+            for (int i = 0; i < redistColumnCount; i++) {
+                redistColumns.add(rowKeyColDescs[i].getColRef());
+            }
+
+            appendDistributeStatement(sql, redistColumns);
         }
 
         return sql.toString();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index eae2e1c..9a2c242 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
@@ -81,11 +82,11 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
-            IJoinedFlatTableDesc flatDesc) {
+            IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
         step.setInitStatement(hiveInitStatements);
         step.setIntermediateTable(flatDesc.getTableName());
-        step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
+        step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc));
         CubingExecutableUtil.setCubeName(cubeName, step.getParams());
         step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
         return step;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index bfea632..d1b4fc9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,6 +28,7 @@ import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -118,8 +119,9 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
-                    .getConfig();
+            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            final KylinConfig cubeConfig = cubeInstance.getConfig();
+
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
             // create flat table first
@@ -127,9 +129,7 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
-                if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
-                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
-                }
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
             }
 
             // special for hive
@@ -154,7 +154,6 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
             }
         }
 
-
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index 779835b..881be1a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.ISparkInput;
@@ -75,8 +76,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
-                    .getConfig();
+            CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            final KylinConfig cubeConfig = cubeInstance.getConfig();
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
             // create flat table first
@@ -84,9 +85,7 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
-                if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
-                    jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
-                }
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
             }
 
             // special for hive
@@ -103,8 +102,6 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
             }
         }
 
-
-
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);