You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:33 UTC
[kylin] 20/30: KYLIN-3991 Provide more config options for Flink
cube engine
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit be1944846f73bde4f85079686d71e3bcc3123670
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Jun 12 12:58:43 2019 +0800
KYLIN-3991 Provide more config options for Flink cube engine
---
.../src/main/resources/kylin-defaults.properties | 3 +++
.../kylin/engine/flink/FlinkCubingByLayer.java | 19 +++++++++++-----
.../kylin/engine/flink/FlinkCubingMerge.java | 12 ++++++++++
.../apache/kylin/engine/flink/FlinkExecutable.java | 26 +++++++++++++++++-----
.../engine/flink/FlinkOnYarnConfigMapping.java | 10 +++++++++
.../org/apache/kylin/engine/flink/FlinkUtil.java | 10 ---------
.../engine/flink/FlinkOnYarnConfigMappingTest.java | 18 ---------------
7 files changed, 59 insertions(+), 39 deletions(-)
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 174dbcc..8b20a92 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -347,6 +347,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
kylin.engine.flink-conf.jobmanager.heap.size=2G
kylin.engine.flink-conf.taskmanager.heap.size=4G
kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1
+kylin.engine.flink-conf.taskmanager.memory.preallocate=false
+kylin.engine.flink-conf.job.parallelism=1
+kylin.engine.flink-conf.program.enableObjectReuse=false
### QUERY PUSH DOWN ###
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 1393cfc..e89c5ba 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -90,6 +90,8 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
.withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+ .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
private Options options;
@@ -101,6 +103,7 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_ENABLE_OBJECT_REUSE);
}
@Override
@@ -116,6 +119,12 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+ boolean enableObjectReuse = false;
+ if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+ enableObjectReuse = true;
+ }
Job job = Job.getInstance();
@@ -155,6 +164,9 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (enableObjectReuse) {
+ env.getConfig().enableObjectReuse();
+ }
env.getConfig().registerKryoType(PercentileCounter.class);
env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
@@ -178,19 +190,16 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
DataSet<Tuple2<ByteArray, Object[]>>[] allDataSets = new DataSet[totalLevels + 1];
int level = 0;
- int partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
// aggregate to calculate base cuboid
- allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction).setParallelism(partition);
+ allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction);
sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
for (level = 1; level <= totalLevels; level++) {
- partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
-
- allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction).setParallelism(partition);
+ allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction);
if (envConfig.isFlinkSanityCheckEnabled()) {
sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 6a4ed66..3f2a44d 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -81,6 +81,8 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
.isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+ .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
private Options options;
@@ -94,6 +96,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_ENABLE_OBJECT_REUSE);
}
@Override
@@ -108,6 +111,12 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+ boolean enableObjectReuse = false;
+ if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+ enableObjectReuse = true;
+ }
Job job = Job.getInstance();
FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
@@ -127,6 +136,9 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (enableObjectReuse) {
+ env.getConfig().enableObjectReuse();
+ }
env.getConfig().registerKryoType(PercentileCounter.class);
env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 6afd4b3..b883d01 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -188,20 +188,34 @@ public class FlinkExecutable extends AbstractExecutable {
flinkSpecificConfs.putAll(flinkConfs);
}
+ int parallelism = 1;
for (Map.Entry<String, String> entry : flinkConfs.entrySet()) {
- if (!FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())) {
- logger.warn("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
+ if (!(FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())
+ || entry.getKey().startsWith("program") || entry.getKey().startsWith("job"))) {
+ logger.error("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
throw new IllegalArgumentException("Unsupported Flink configuration pair : key["
+ entry.getKey() + "], value[" + entry.getValue() + "]");
}
- String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
- sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
+ if (entry.getKey().equals("job.parallelism")) {
+ parallelism = Integer.parseInt(entry.getValue());
+ } else if (entry.getKey().startsWith("program.")) {
+ getParams().put(entry.getKey().replaceAll("program.", ""), entry.getValue());
+ } else {
+ String configOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
+ //flink on yarn specific option (pattern : -yn 1)
+ if (configOptionKey.startsWith("-y")) {
+ sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
+ } else {
+ //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
+ sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
+ }
+ }
}
- sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
+ sb.append(" -c org.apache.kylin.common.util.FlinkEntry -p %s %s %s ");
final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
- KylinConfig.getFlinkHome(), jars, formatArgs());
+ KylinConfig.getFlinkHome(), parallelism, jars, formatArgs());
logger.info("cmd: " + cmd);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CliCommandExecutor exec = new CliCommandExecutor();
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index 721b9a9..fe64f0b 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -64,6 +64,16 @@ public class FlinkOnYarnConfigMapping {
flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
}
}
+
+ ConfigOption<Boolean> tmMemoryPreallocate = TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
+ flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD taskmanager.memory.preallocate");
+ if (taskSlotNumOption.hasDeprecatedKeys()) {
+ Iterator<String> deprecatedKeyIterator = tmMemoryPreallocate.deprecatedKeys().iterator();
+ while (deprecatedKeyIterator.hasNext()) {
+ flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yD taskmanager.memory.preallocate");
+ }
+ }
+
}
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index e7b1a49..884b25d 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -94,16 +94,6 @@ public class FlinkUtil {
return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
}
-
- public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
- double baseCuboidSize = statsReader.estimateLayerSize(level);
- float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
- int partition = (int) (baseCuboidSize / partitionCutMB);
- partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
- partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
- return partition;
- }
-
public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
double totalSize = 0;
for (double x: statsReader.getCuboidSizeMap().values()) {
diff --git a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index b26fd04..4355983 100644
--- a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++ b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -22,9 +22,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
/**
@@ -32,15 +30,6 @@ import java.util.Map;
*/
public class FlinkOnYarnConfigMappingTest {
- private static List<String> flinkOnYarnConfigOptionKeys;
-
- static {
- flinkOnYarnConfigOptionKeys = new ArrayList<>();
- flinkOnYarnConfigOptionKeys.add("-yjm");
- flinkOnYarnConfigOptionKeys.add("-ytm");
- flinkOnYarnConfigOptionKeys.add("-ys");
- }
-
@Test
public void testFlinkOnYarnJMMemOption() {
String flinkonYarnJMMemOption = "-yjm";
@@ -116,11 +105,4 @@ public class FlinkOnYarnConfigMappingTest {
}
}
- @Test
- public void testFlinkOnYarnConfigOptionKeySet() {
- for (String flinkOnYarnCnfigOptionKey : FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.values()) {
- Assert.assertTrue(flinkOnYarnConfigOptionKeys.contains(flinkOnYarnCnfigOptionKey));
- }
- }
-
}