You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/19 07:45:58 UTC
[kylin] branch master updated: KYLIN-3457 Distribute by multi
column if not set distribute column
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 95d2a5b KYLIN-3457 Distribute by multi column if not set distribute column
95d2a5b is described below
commit 95d2a5befaa596b2d502026d53a791fa31d20bbe
Author: chao long <wa...@qq.com>
AuthorDate: Thu Jul 19 11:37:12 2018 +0800
KYLIN-3457 Distribute by multi column if not set distribute column
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +++
.../java/org/apache/kylin/job/JoinedFlatTable.java | 36 +++++++++++++++++-----
.../apache/kylin/source/hive/HiveInputBase.java | 5 +--
.../org/apache/kylin/source/hive/HiveMRInput.java | 11 +++----
.../apache/kylin/source/hive/HiveSparkInput.java | 11 +++----
5 files changed, 45 insertions(+), 22 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 637502e..b2331e1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -788,6 +788,10 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
}
+ public int getHiveRedistributeColumnCount() {
+ return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
+ }
+
public int getDefaultVarcharPrecision() {
int v = Integer.parseInt(getOptional("kylin.source.hive.default-varchar-precision", "256"));
if (v < 1) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index a6c6daa..392323e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -188,8 +191,13 @@ public class JoinedFlatTable {
}
}
- private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
- sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
+ private static void appendDistributeStatement(StringBuilder sql, List<TblColRef> redistCols) {
+ sql.append(" DISTRIBUTE BY ");
+ for (TblColRef redistCol : redistCols) {
+ sql.append(colName(redistCol, true)).append(",");
+ }
+ sql.deleteCharAt(sql.length() - 1);
+ sql.append(";\n");
}
private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) {
@@ -252,16 +260,30 @@ public class JoinedFlatTable {
return hiveDataType;
}
- public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
+ public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
final String tableName = flatDesc.getTableName();
StringBuilder sql = new StringBuilder();
sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
- TblColRef clusterCol = flatDesc.getClusterBy();
- if (clusterCol != null) {
- appendClusterStatement(sql, clusterCol);
+ if (flatDesc.getClusterBy() != null) {
+ appendClusterStatement(sql, flatDesc.getClusterBy());
+ } else if (flatDesc.getDistributedBy() != null) {
+ appendDistributeStatement(sql, Lists.newArrayList(flatDesc.getDistributedBy()));
} else {
- appendDistributeStatement(sql, flatDesc.getDistributedBy());
+ int redistColumnCount = KylinConfig.getInstanceFromEnv().getHiveRedistributeColumnCount();
+
+ RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns();
+
+ if (rowKeyColDescs.length < redistColumnCount)
+ redistColumnCount = rowKeyColDescs.length;
+
+ List<TblColRef> redistColumns = Lists.newArrayListWithCapacity(redistColumnCount);
+
+ for (int i = 0; i < redistColumnCount; i++) {
+ redistColumns.add(rowKeyColDescs[i].getColRef());
+ }
+
+ appendDistributeStatement(sql, redistColumns);
}
return sql.toString();
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index eae2e1c..9a2c242 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
@@ -81,11 +82,11 @@ public class HiveInputBase {
}
protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
- IJoinedFlatTableDesc flatDesc) {
+ IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
step.setInitStatement(hiveInitStatements);
step.setIntermediateTable(flatDesc.getTableName());
- step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
+ step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc));
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
return step;
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index bfea632..d1b4fc9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,6 +28,7 @@ import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -118,8 +119,9 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
- .getConfig();
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ final KylinConfig cubeConfig = cubeInstance.getConfig();
+
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
// create flat table first
@@ -127,9 +129,7 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
- if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
- }
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
}
// special for hive
@@ -154,7 +154,6 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
}
}
-
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index 779835b..881be1a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.ISparkInput;
@@ -75,8 +76,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
- final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
- .getConfig();
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ final KylinConfig cubeConfig = cubeInstance.getConfig();
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
// create flat table first
@@ -84,9 +85,7 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
- if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
- }
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
}
// special for hive
@@ -103,8 +102,6 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
}
}
-
-
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);