You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:34 UTC

[kylin] 21/30: KYLIN-4050 Fix a CLI option parse error in FlinkMergingDictionary and some little code refactor

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d50c0c1e923e97eb1bc3ebd40763e89be324d8e9
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Jun 18 20:14:56 2019 +0800

    KYLIN-4050 Fix a CLI option parse error in FlinkMergingDictionary and some little code refactor
---
 .../kylin/engine/flink/FlinkCubingByLayer.java     |  2 +-
 .../kylin/engine/flink/FlinkCubingMerge.java       |  6 ++---
 .../apache/kylin/engine/flink/FlinkExecutable.java | 27 ++++++++--------------
 .../kylin/engine/flink/FlinkMergingDictionary.java | 16 +++++++++++--
 .../org/apache/kylin/engine/flink/FlinkUtil.java   | 14 -----------
 5 files changed, 26 insertions(+), 39 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index e89c5ba..1551788 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -91,7 +91,7 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
-            .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
+            .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
 
     private Options options;
 
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 3f2a44d..b44ef03 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -82,7 +82,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
-            .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
+            .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
 
     private Options options;
 
@@ -111,7 +111,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
-        String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+        final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
 
         boolean enableObjectReuse = false;
         if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
@@ -183,7 +183,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
                 unionedDataSet
                         .groupBy(0)
                         .reduce(new MeasureReduceFunction(aggregators))
-                        .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
                         .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
                         .output(hadoopOF);
             }
@@ -222,7 +221,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
                     unionedDataSet
                             .groupBy(0)
                             .reduce(new MeasureReduceFunction(aggregators))
-                            .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
                             .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
                             .output(hadoopOF);
                 }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index b883d01..129b3cf 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -54,7 +54,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -219,21 +218,17 @@ public class FlinkExecutable extends AbstractExecutable {
             logger.info("cmd: " + cmd);
             final ExecutorService executorService = Executors.newSingleThreadExecutor();
             final CliCommandExecutor exec = new CliCommandExecutor();
-            final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
-                @Override
-                public void onLogEvent(String infoKey, Map<String, String> info) {
-                    // only care three properties here
-                    if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
-                        getManager().addJobInfo(getId(), info);
-                    }
+            final PatternedLogger patternedLogger = new PatternedLogger(logger, (String infoKey, Map<String, String> info) -> {
+                // only care three properties here
+                if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
+                        || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+                        || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+                    getManager().addJobInfo(getId(), info);
                 }
             });
 
-            Callable callable = new Callable<Pair<Integer, String>>() {
-                @Override
-                public Pair<Integer, String> call() throws Exception {
+            try {
+                Future<Pair<Integer, String>> future = executorService.submit(() -> {
                     Pair<Integer, String> result;
                     try {
                         result = exec.execute(cmd, patternedLogger);
@@ -242,11 +237,7 @@ public class FlinkExecutable extends AbstractExecutable {
                         result = new Pair<>(-1, e.getMessage());
                     }
                     return result;
-                }
-            };
-
-            try {
-                Future<Pair<Integer, String>> future = executorService.submit(callable);
+                });
                 Pair<Integer, String> result = null;
                 while (!isDiscarded() && !isPaused()) {
                     if (future.isDone()) {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
index 3839535..f88eb0c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
@@ -90,6 +90,8 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
             .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
     public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
             .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+    public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+            .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
 
     private Options options;
 
@@ -101,6 +103,7 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
         options.addOption(OPTION_MERGE_SEGMENT_IDS);
         options.addOption(OPTION_OUTPUT_PATH_DICT);
         options.addOption(OPTION_OUTPUT_PATH_STAT);
+        options.addOption(OPTION_ENABLE_OBJECT_REUSE);
     }
 
     @Override
@@ -116,10 +119,20 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
         final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
         final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
         final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+        final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+        boolean enableObjectReuse = false;
+        if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+            enableObjectReuse = true;
+        }
 
         final Job job = Job.getInstance();
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        if (enableObjectReuse) {
+            env.getConfig().enableObjectReuse();
+        }
+
         HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
 
         final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
@@ -143,8 +156,7 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
         DataSource<Integer> indexDS = env.fromCollection(indexs);
 
         DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
-                metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf))
-                .setParallelism(columnLength + 1);
+                metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));
 
         FlinkUtil.setHadoopConfForCuboid(job, null, null);
         HadoopOutputFormat<Text, Text> hadoopOF =
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index 884b25d..4ea5fc2 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -33,14 +33,12 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.storage.StorageFactory;
@@ -94,18 +92,6 @@ public class FlinkUtil {
         return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
     }
 
-    public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
-        double totalSize = 0;
-        for (double x: statsReader.getCuboidSizeMap().values()) {
-            totalSize += x;
-        }
-        float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
-        int partition = (int) (totalSize / partitionCutMB);
-        partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
-        partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
-        return partition;
-    }
-
     public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);