You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:42 UTC

[kylin] 29/30: Fix flink config bug and some minor fix

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit cda816304a10168d750a63437c8e0487f58f6e30
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Dec 17 17:47:08 2019 +0800

    Fix flink config bug and some minor fix
---
 .../java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java | 4 ++--
 .../org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java  | 4 ++--
 .../main/java/org/apache/kylin/engine/flink/FlinkExecutable.java    | 6 +++---
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
index abd57b0..d34e803 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingEngine2.java
@@ -41,8 +41,8 @@ public class FlinkBatchCubingEngine2 implements IBatchCubingEngine {
     }
 
     @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new FlinkBatchCubingJobBuilder2(newSegment, submitter).build();
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        return new FlinkBatchCubingJobBuilder2(newSegment, submitter, priorityOffset).build();
     }
 
     @Override
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
index ba475bf..925334c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchCubingJobBuilder2.java
@@ -44,8 +44,8 @@ public class FlinkBatchCubingJobBuilder2 extends JobBuilderSupport {
     private final IFlinkInput.IFlinkBatchCubingInputSide inputSide;
     private final IFlinkOutput.IFlinkBatchCubingOutputSide outputSide;
 
-    public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
-        super(newSegment, submitter);
+    public FlinkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
+        super(newSegment, submitter, priorityOffset);
         this.inputSide = FlinkUtil.getBatchCubingInputSide(seg);
         this.outputSide = FlinkUtil.getBatchCubingOutputSide(seg);
     }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 95e9c4f..5344cb5 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -89,7 +89,7 @@ public class FlinkExecutable extends AbstractExecutable {
 
     public void setCounterSaveAs(String value, String counterOutputPath) {
         this.setParam(COUNTER_SAVE_AS, value);
-        this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+        this.setParam(BatchConstants.ARG_COUNTER_OUTPUT, counterOutputPath);
     }
 
     public String getCounterSaveAs() {
@@ -205,7 +205,7 @@ public class FlinkExecutable extends AbstractExecutable {
                     //flink on yarn specific option (pattern : -yn 1)
                     if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) {
                         sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
-                    } else {
+                    } else if(!configOptionKey.startsWith("-y")){
                         //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
                         sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
                     }
@@ -272,7 +272,7 @@ public class FlinkExecutable extends AbstractExecutable {
                     // done, update all properties
                     Map<String, String> joblogInfo = patternedLogger.getInfo();
                     // read counter from hdfs
-                    String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                    String counterOutput = getParam(BatchConstants.ARG_COUNTER_OUTPUT);
                     if (counterOutput != null) {
                         if (HadoopUtil.getWorkingFileSystem().exists(new Path(counterOutput))) {
                             Map<String, String> counterMap = HadoopUtil.readFromSequenceFile(counterOutput);