You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:34 UTC
[kylin] 21/30: KYLIN-4050 Fix a CLI option parse error in
FlinkMergingDictionary and some little code refactor
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d50c0c1e923e97eb1bc3ebd40763e89be324d8e9
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Jun 18 20:14:56 2019 +0800
KYLIN-4050 Fix a CLI option parse error in FlinkMergingDictionary and some little code refactor
---
.../kylin/engine/flink/FlinkCubingByLayer.java | 2 +-
.../kylin/engine/flink/FlinkCubingMerge.java | 6 ++---
.../apache/kylin/engine/flink/FlinkExecutable.java | 27 ++++++++--------------
.../kylin/engine/flink/FlinkMergingDictionary.java | 16 +++++++++++--
.../org/apache/kylin/engine/flink/FlinkUtil.java | 14 -----------
5 files changed, 26 insertions(+), 39 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index e89c5ba..1551788 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -91,7 +91,7 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
- .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
+ .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
private Options options;
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 3f2a44d..b44ef03 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -82,7 +82,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
- .isRequired(true).withDescription("Enable object reuse").create("enableObjectReuse");
+ .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
private Options options;
@@ -111,7 +111,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
- String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+ final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
boolean enableObjectReuse = false;
if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
@@ -183,7 +183,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
unionedDataSet
.groupBy(0)
.reduce(new MeasureReduceFunction(aggregators))
- .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
.output(hadoopOF);
}
@@ -222,7 +221,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
unionedDataSet
.groupBy(0)
.reduce(new MeasureReduceFunction(aggregators))
- .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
.output(hadoopOF);
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index b883d01..129b3cf 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -54,7 +54,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -219,21 +218,17 @@ public class FlinkExecutable extends AbstractExecutable {
logger.info("cmd: " + cmd);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CliCommandExecutor exec = new CliCommandExecutor();
- final PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
- @Override
- public void onLogEvent(String infoKey, Map<String, String> info) {
- // only care three properties here
- if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
- || ExecutableConstants.YARN_APP_ID.equals(infoKey)
- || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
- getManager().addJobInfo(getId(), info);
- }
+ final PatternedLogger patternedLogger = new PatternedLogger(logger, (String infoKey, Map<String, String> info) -> {
+ // only care three properties here
+ if (ExecutableConstants.FLINK_JOB_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_ID.equals(infoKey)
+ || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
+ getManager().addJobInfo(getId(), info);
}
});
- Callable callable = new Callable<Pair<Integer, String>>() {
- @Override
- public Pair<Integer, String> call() throws Exception {
+ try {
+ Future<Pair<Integer, String>> future = executorService.submit(() -> {
Pair<Integer, String> result;
try {
result = exec.execute(cmd, patternedLogger);
@@ -242,11 +237,7 @@ public class FlinkExecutable extends AbstractExecutable {
result = new Pair<>(-1, e.getMessage());
}
return result;
- }
- };
-
- try {
- Future<Pair<Integer, String>> future = executorService.submit(callable);
+ });
Pair<Integer, String> result = null;
while (!isDiscarded() && !isPaused()) {
if (future.isDone()) {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
index 3839535..f88eb0c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
@@ -90,6 +90,8 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
.isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
.isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+ public static final Option OPTION_ENABLE_OBJECT_REUSE = OptionBuilder.withArgName("enableObjectReuse").hasArg()
+ .isRequired(false).withDescription("Enable object reuse").create("enableObjectReuse");
private Options options;
@@ -101,6 +103,7 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
options.addOption(OPTION_MERGE_SEGMENT_IDS);
options.addOption(OPTION_OUTPUT_PATH_DICT);
options.addOption(OPTION_OUTPUT_PATH_STAT);
+ options.addOption(OPTION_ENABLE_OBJECT_REUSE);
}
@Override
@@ -116,10 +119,20 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+ final String enableObjectReuseOptValue = optionsHelper.getOptionValue(OPTION_ENABLE_OBJECT_REUSE);
+
+ boolean enableObjectReuse = false;
+ if (enableObjectReuseOptValue != null && !enableObjectReuseOptValue.isEmpty()) {
+ enableObjectReuse = true;
+ }
final Job job = Job.getInstance();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ if (enableObjectReuse) {
+ env.getConfig().enableObjectReuse();
+ }
+
HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
@@ -143,8 +156,7 @@ public class FlinkMergingDictionary extends AbstractApplication implements Seria
DataSource<Integer> indexDS = env.fromCollection(indexs);
DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new MergeDictAndStatsFunction(cubeName,
- metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf))
- .setParallelism(columnLength + 1);
+ metaUrl, segmentId, StringUtil.splitByComma(segmentIds), statOutputPath, tblColRefs, sConf));
FlinkUtil.setHadoopConfForCuboid(job, null, null);
HadoopOutputFormat<Text, Text> hadoopOF =
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index 884b25d..4ea5fc2 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -33,14 +33,12 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
@@ -94,18 +92,6 @@ public class FlinkUtil {
return env.createInput(HadoopInputs.readSequenceFile(keyClass, valueClass, StringUtil.join(inputFolders, ",")));
}
- public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
- double totalSize = 0;
- for (double x: statsReader.getCuboidSizeMap().values()) {
- totalSize += x;
- }
- float partitionCutMB = kylinConfig.getFlinkPartitionCutMB();
- int partition = (int) (totalSize / partitionCutMB);
- partition = Math.max(kylinConfig.getFlinkMinPartition(), partition);
- partition = Math.min(kylinConfig.getFlinkMaxPartition(), partition);
- return partition;
- }
-
public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);