You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:33 UTC

[kylin] 20/30: KYLIN-3991 Provide more config options for Flink cube engine

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit be1944846f73bde4f85079686d71e3bcc3123670
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Jun 12 12:58:43 2019 +0800

    KYLIN-3991 Provide more config options for Flink cube engine
---
 .../src/main/resources/kylin-defaults.properties   |  3 +++
 .../kylin/engine/flink/FlinkCubingByLayer.java     | 19 +++++++++++-----
 .../kylin/engine/flink/FlinkCubingMerge.java       | 12 ++++++++++
 .../apache/kylin/engine/flink/FlinkExecutable.java | 26 +++++++++++++++++-----
 .../engine/flink/FlinkOnYarnConfigMapping.java     | 10 +++++++++
 .../org/apache/kylin/engine/flink/FlinkUtil.java   | 10 ---------
 .../engine/flink/FlinkOnYarnConfigMappingTest.java | 18 ---------------
 7 files changed, 59 insertions(+), 39 deletions(-)

diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 174dbcc..8b20a92 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -347,6 +347,9 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 kylin.engine.flink-conf.jobmanager.heap.size=2G
 kylin.engine.flink-conf.taskmanager.heap.size=4G
 kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=1
+kylin.engine.flink-conf.taskmanager.memory.preallocate=false
+kylin.engine.flink-conf.job.parallelism=1
+kylin.engine.flink-conf.program.enableObjectReuse=false
 
 ### QUERY PUSH DOWN ###
 
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 1393cfc..e89c5ba 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -90,6 +90,8 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
             .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+            .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
 
     private Options options;
 
@@ -101,6 +103,7 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_ENABLE_OBJECT_REUSE);
     }
 
     @Override
@@ -116,6 +119,12 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+        boolean enableObjectReuse = false;
+        if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+            enableObjectReuse = true;
+        }
 
         Job job = Job.getInstance();
 
@@ -155,6 +164,9 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        if (enableObjectReuse) {
+            env.getConfig().enableObjectReuse();
+        }
         env.getConfig().registerKryoType(PercentileCounter.class);
         env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
 
@@ -178,19 +190,16 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
         DataSet<Tuple2<ByteArray, Object[]>>[] allDataSets = new DataSet[totalLevels + 1];
         int level = 0;
-        int partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
 
         // aggregate to calculate base cuboid
-        allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction).setParallelism(partition);
+        allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction);
 
         sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig);
 
         CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
 
         for (level = 1; level <= totalLevels; level++) {
-            partition = FlinkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
-
-            allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction).setParallelism(partition);
+            allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction);
             if (envConfig.isFlinkSanityCheckEnabled()) {
                 sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex);
             }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 6a4ed66..3f2a44d 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -81,6 +81,8 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
             .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+            .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
 
     private Options options;
 
@@ -94,6 +96,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_ENABLE_OBJECT_REUSE);
     }
 
     @Override
@@ -108,6 +111,12 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+        boolean enableObjectReuse = false;
+        if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+            enableObjectReuse = true;
+        }
 
         Job job = Job.getInstance();
         FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
@@ -127,6 +136,9 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        if (enableObjectReuse) {
+            env.getConfig().enableObjectReuse();
+        }
         env.getConfig().registerKryoType(PercentileCounter.class);
         env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
 
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 6afd4b3..b883d01 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -188,20 +188,34 @@ public class FlinkExecutable extends AbstractExecutable {
                 flinkSpecificConfs.putAll(flinkConfs);
             }
 
+            int parallelism = 1;
             for (Map.Entry<String, String> entry : flinkConfs.entrySet()) {
-                if (!FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())) {
-                    logger.warn("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
+                if (!(FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.containsKey(entry.getKey())
+                        || entry.getKey().startsWith("program") || entry.getKey().startsWith("job"))) {
+                    logger.error("Unsupported Flink configuration pair : key[%s], value[%s]", entry.getKey(), entry.getValue());
                     throw new IllegalArgumentException("Unsupported Flink configuration pair : key["
                             + entry.getKey() + "], value[" + entry.getValue() + "]");
                 }
 
-                String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
-                sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
+                if (entry.getKey().equals("job.parallelism")) {
+                    parallelism = Integer.parseInt(entry.getValue());
+                } else if (entry.getKey().startsWith("program.")) {
+                    getParams().put(entry.getKey().replaceAll("program.", ""), entry.getValue());
+                } else {
+                    String configOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
+                    //flink on yarn specific option (pattern : -yn 1)
+                    if (configOptionKey.startsWith("-y")) {
+                        sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
+                    } else {
+                        //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
+                        sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
+                    }
+                }
             }
 
-            sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
+            sb.append(" -c org.apache.kylin.common.util.FlinkEntry -p %s %s %s ");
             final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
-                    KylinConfig.getFlinkHome(), jars, formatArgs());
+                    KylinConfig.getFlinkHome(), parallelism, jars, formatArgs());
             logger.info("cmd: " + cmd);
             final ExecutorService executorService = Executors.newSingleThreadExecutor();
             final CliCommandExecutor exec = new CliCommandExecutor();
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
index 721b9a9..fe64f0b 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMapping.java
@@ -64,6 +64,16 @@ public class FlinkOnYarnConfigMapping {
                 flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-ys");
             }
         }
+
+        ConfigOption<Boolean> tmMemoryPreallocate = TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE;
+        flinkOnYarnConfigMap.put(tmMemoryPreallocate.key(), "-yD taskmanager.memory.preallocate");
+        if (taskSlotNumOption.hasDeprecatedKeys()) {
+            Iterator<String> deprecatedKeyIterator = tmMemoryPreallocate.deprecatedKeys().iterator();
+            while (deprecatedKeyIterator.hasNext()) {
+                flinkOnYarnConfigMap.put(deprecatedKeyIterator.next(), "-yD taskmanager.memory.preallocate");
+            }
+        }
+
     }
 
 }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index e7b1a49..884b25d 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -94,16 +94,6 @@ public class FlinkUtil {
         return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
     }
 
-
-    public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
-        double baseCuboidSize = statsReader.estimateLayerSize(level);
-        float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
-        int partition = (int) (baseCuboidSize / partitionCutMB);
-        partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
-        partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
-        return partition;
-    }
-
     public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
         double totalSize = 0;
         for (double x: statsReader.getCuboidSizeMap().values()) {
diff --git a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
index b26fd04..4355983 100644
--- a/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
+++ b/engine-flink/src/test/java/org/apache/kylin/engine/flink/FlinkOnYarnConfigMappingTest.java
@@ -22,9 +22,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -32,15 +30,6 @@ import java.util.Map;
  */
 public class FlinkOnYarnConfigMappingTest {
 
-    private static List<String> flinkOnYarnConfigOptionKeys;
-
-    static {
-        flinkOnYarnConfigOptionKeys = new ArrayList<>();
-        flinkOnYarnConfigOptionKeys.add("-yjm");
-        flinkOnYarnConfigOptionKeys.add("-ytm");
-        flinkOnYarnConfigOptionKeys.add("-ys");
-    }
-
     @Test
     public void testFlinkOnYarnJMMemOption() {
         String flinkonYarnJMMemOption = "-yjm";
@@ -116,11 +105,4 @@ public class FlinkOnYarnConfigMappingTest {
         }
     }
 
-    @Test
-    public void testFlinkOnYarnConfigOptionKeySet() {
-        for (String flinkOnYarnCnfigOptionKey : FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.values()) {
-            Assert.assertTrue(flinkOnYarnConfigOptionKeys.contains(flinkOnYarnCnfigOptionKey));
-        }
-    }
-
 }