You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:36 UTC

[kylin] 23/30: Minor, fix merge error during IT test

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 87139cdc3459fa78dc9f61f77aa799f213c1ea0b
Author: nichunen <ni...@apache.org>
AuthorDate: Sun Jul 7 15:27:16 2019 +0800

    Minor, fix merge error during IT test
---
 .../org/apache/kylin/engine/flink/FlinkCubingMerge.java    | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index b44ef03..8ac96c1 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -51,7 +51,6 @@ import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
 import org.apache.kylin.measure.BufferedMeasureCodec;
@@ -128,7 +127,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
 
         logger.info("Input path: {}", inputPath);
         logger.info("Output path: {}", outputPath);
@@ -175,7 +173,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
                     new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
 
             if (mergingSegs.size() > 0) {
-                DataSet unionedDataSet = mergingSegs.get(0);
+                DataSet<Tuple2<Text, Object[]>> unionedDataSet = mergingSegs.get(0);
                 for (int i = 1; i < mergingSegs.size(); i++) {
                     unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
                 }
@@ -271,7 +269,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         }
     }
 
-    private static class MeasureReduceFunction implements ReduceFunction<Object[]> {
+    private static class MeasureReduceFunction implements ReduceFunction<Tuple2<Text, Object[]>> {
 
         private MeasureAggregators aggregators;
 
@@ -280,10 +278,10 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         }
 
         @Override
-        public Object[] reduce(Object[] input1, Object[] input2) throws Exception {
-            Object[] measureObjs = new Object[input1.length];
-            aggregators.aggregate(input1, input2, measureObjs);
-            return measureObjs;
+        public Tuple2<Text, Object[]> reduce(Tuple2<Text, Object[]> input1, Tuple2<Text, Object[]> input2) throws Exception {
+            Object[] measureObjs = new Object[input1.f1.length];
+            aggregators.aggregate(input1.f1, input2.f1, measureObjs);
+            return new Tuple2<>(input1.f0, measureObjs);
         }
     }