You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:42 UTC
[kylin] 29/30: Fix flink config bug and some minor fix
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit cda816304a10168d750a63437c8e0487f58f6e30
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Dec 17 17:47:08 2019 +0800
Fix flink config bug and some minor fix
---
.../java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java | 4 ++--
.../org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java | 4 ++--
.../main/java/org/apache/kylin/engine/flink/FlinkExecutable.java | 6 +++---
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
index abd57b0..d34e803 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
@@ -41,8 +41,8 @@ public class FlinkBatchCubingEngine2 implements IBatchCubingEngine {
}
@Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new FlinkBatchCubingJobBuilder2(newSegment, submitter).build();
+ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ return new FlinkBatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
}
@Override
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
index ba475bf..925334c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -44,8 +44,8 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
private final IFlinkInput.IFlinkBatchCubingInputSide inputSide;
private final IFlinkOutput.IFlinkBatchCubingOutputSide outputSide;
- public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
- super(newSegment, submitter);
+ public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+ super(newSegment, submitter, priorityOffset);
this.inputSide = FlinkUtil.getBatchCubingInputSide(seg);
this.outputSide = FlinkUtil.getBatchCubingOutputSide(seg);
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 95e9c4f..5344cb5 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -89,7 +89,7 @@ public class FlinkExecutable extends AbstractExecutable {
public void setCounterSaveAs(String value, String counterOutputPath) {
this.setParam(COUNTER_SAVE_AS, value);
- this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+ this.setParam(BatchConstants.ARG_COUNTER_OUTPUT, counterOutputPath);
}
public String getCounterSaveAs() {
@@ -205,7 +205,7 @@ public class FlinkExecutable extends AbstractExecutable {
//flink on yarn specific option (pattern : -yn 1)
if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) {
sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
- } else {
+ } else if(!configOptionKey.startsWith("-y")){
//flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
}
@@ -272,7 +272,7 @@ public class FlinkExecutable extends AbstractExecutable {
// done, update all properties
Map<String, String> joblogInfo = patternedLogger.getInfo();
// read counter from hdfs
- String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+ String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUTPUT);
if (counterOutput != null) {
if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);