You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:36 UTC
[kylin] 23/30: Minor, fix merge error during IT test
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 87139cdc3459fa78dc9f61f77aa799f213c1ea0b
Author: nichunen <ni...@apache.org>
AuthorDate: Sun Jul 7 15:27:16 2019 +0800
Minor, fix merge error during IT test
---
.../org/apache/kylin/engine/flink/FlinkCubingMerge.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index b44ef03..8ac96c1 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -51,7 +51,6 @@ import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
import org.apache.kylin.measure.BufferedMeasureCodec;
@@ -128,7 +127,6 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
@@ -175,7 +173,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
if (mergingSegs.size() > 0) {
- DataSet unionedDataSet = mergingSegs.get(0);
+ DataSet<Tuple2<Text, Object[]>> unionedDataSet = mergingSegs.get(0);
for (int i = 1; i < mergingSegs.size(); i++) {
unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
}
@@ -271,7 +269,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
}
}
- private static class MeasureReduceFunction implements ReduceFunction<Object[]> {
+ private static class MeasureReduceFunction implements ReduceFunction<Tuple2<Text, Object[]>> {
private MeasureAggregators aggregators;
@@ -280,10 +278,10 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
}
@Override
- public Object[] reduce(Object[] input1, Object[] input2) throws Exception {
- Object[] measureObjs = new Object[input1.length];
- aggregators.aggregate(input1, input2, measureObjs);
- return measureObjs;
+ public Tuple2<Text, Object[]> reduce(Tuple2<Text, Object[]> input1, Tuple2<Text, Object[]> input2) throws Exception {
+ Object[] measureObjs = new Object[input1.f1.length];
+ aggregators.aggregate(input1.f1, input2.f1, measureObjs);
+ return new Tuple2<>(input1.f0, measureObjs);
}
}