You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/03/03 09:11:53 UTC

[kylin] branch master updated: KYLIN-4387 Flink cubing merge step failed

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 22ac013  KYLIN-4387 Flink cubing merge step failed
22ac013 is described below

commit 22ac0132dabff0241eb4a7781a451b82d907c5a7
Author: Harvey Yue <we...@roadefend.com>
AuthorDate: Mon Mar 2 12:21:15 2020 +0800

    KYLIN-4387 Flink cubing merge step failed
---
 .../kylin/engine/flink/FlinkCubingMerge.java       | 78 +++++++++++++++-------
 1 file changed, 54 insertions(+), 24 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 8ac96c1..c02cd41 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.engine.flink;
 
 import com.google.common.collect.Lists;
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,14 +30,17 @@ import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Pair;
@@ -47,7 +51,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -147,7 +150,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
         boolean isLegacyMode = false;
         for (String inputFolder : inputFolders) {
-            Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+            Path baseCuboidPath = new Path(FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
             if (fs.exists(baseCuboidPath) == false) {
                 // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
                 isLegacyMode = true;
@@ -157,13 +160,13 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
 
         if (isLegacyMode) {
             // merge all layer's cuboid at once, this might be hard for Spark
-            List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
+            List<DataSet<Tuple2<ByteArray, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
             for (int i = 0; i < inputFolders.length; i++) {
                 String path = inputFolders[i];
                 DataSet segRdd = FlinkUtil.parseInputPath(path, fs, env, Text.class, Text.class);
                 CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
                 // re-encode with new dictionaries
-                DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+                DataSet<Tuple2<ByteArray, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
                         sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                 mergingSegs.add(newEcoddedRdd);
             }
@@ -173,34 +176,34 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
                     new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
 
             if (mergingSegs.size() > 0) {
-                DataSet<Tuple2<Text, Object[]>> unionedDataSet = mergingSegs.get(0);
+                DataSet<Tuple2<ByteArray, Object[]>> unionedDataSet = mergingSegs.get(0);
                 for (int i = 1; i < mergingSegs.size(); i++) {
                     unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
                 }
 
                 unionedDataSet
                         .groupBy(0)
-                        .reduce(new MeasureReduceFunction(aggregators))
+                        .reduceGroup(new MeasureReduceGroupFunction(aggregators))
                         .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
                         .output(hadoopOF);
             }
         } else {
             // merge by layer
             for (int level = 0; level <= totalLevels; level++) {
-                List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayList();
+                List<DataSet<Tuple2<ByteArray, Object[]>>> mergingSegs = Lists.newArrayList();
                 for (int i = 0; i < inputFolders.length; i++) {
                     String path = inputFolders[i];
                     CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-                    final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
-                    DataSet<Tuple2<Text, Text>> segRdd = env.createInput(
-                            HadoopInputs.readSequenceFile(Text.class, Text.class, cuboidInputPath));
+                    final String cuboidInputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                    DataSet<Tuple2<Text, Text>> segRdd = env.createInput(HadoopInputs.readHadoopFile(
+                            new SequenceFileInputFormat(), Text.class, Text.class, cuboidInputPath));
                     // re-encode with new dictionaries
-                    DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+                    DataSet<Tuple2<ByteArray, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
                             sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                     mergingSegs.add(newEcoddedRdd);
                 }
 
-                final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+                final String cuboidOutputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
 
                 Job jobInstanceForEachOutputFormat = Job.getInstance();
                 FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress
@@ -211,14 +214,14 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
                         new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), jobInstanceForEachOutputFormat);
 
                 if (mergingSegs.size() > 0) {
-                    DataSet unionedDataSet = mergingSegs.get(0);
+                    DataSet<Tuple2<ByteArray, Object[]>> unionedDataSet = mergingSegs.get(0);
                     for (int i = 1; i < mergingSegs.size(); i++) {
                         unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
                     }
 
                     unionedDataSet
                             .groupBy(0)
-                            .reduce(new MeasureReduceFunction(aggregators))
+                            .reduceGroup(new MeasureReduceGroupFunction(aggregators))
                             .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
                             .output(hadoopOF);
                 }
@@ -234,7 +237,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         return CubeInstance.findSegmentWithJobId(jobID, cube);
     }
 
-    private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<Text, Object[]>> {
+    private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<ByteArray, Object[]>> {
         private String cubeName;
         private String sourceSegmentId;
         private String mergedSegmentId;
@@ -244,7 +247,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         private transient SegmentReEncoder segmentReEncoder = null;
 
         ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
-                SerializableConfiguration conf) {
+                               SerializableConfiguration conf) {
             this.cubeName = cubeName;
             this.sourceSegmentId = sourceSegmentId;
             this.mergedSegmentId = mergedSegmentId;
@@ -263,13 +266,13 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         }
 
         @Override
-        public Tuple2<Text, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception {
+        public Tuple2<ByteArray, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception {
             Pair<Text, Object[]> encodedPair = segmentReEncoder.reEncode2(textTextTuple2.f0, textTextTuple2.f1);
-            return new Tuple2(encodedPair.getFirst(), encodedPair.getSecond());
+            return new Tuple2(new ByteArray(encodedPair.getFirst().getBytes()), encodedPair.getSecond());
         }
     }
 
-    private static class MeasureReduceFunction implements ReduceFunction<Tuple2<Text, Object[]>> {
+    private static class MeasureReduceFunction implements ReduceFunction<Tuple2<ByteArray, Object[]>> {
 
         private MeasureAggregators aggregators;
 
@@ -278,14 +281,41 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         }
 
         @Override
-        public Tuple2<Text, Object[]> reduce(Tuple2<Text, Object[]> input1, Tuple2<Text, Object[]> input2) throws Exception {
+        public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1, Tuple2<ByteArray, Object[]> input2) throws Exception {
             Object[] measureObjs = new Object[input1.f1.length];
             aggregators.aggregate(input1.f1, input2.f1, measureObjs);
             return new Tuple2<>(input1.f0, measureObjs);
         }
     }
 
-    private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<Text, Object[]>, Tuple2<Text, Text>> {
+    private static class MeasureReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+        private MeasureAggregators aggregators;
+
+        public MeasureReduceGroupFunction(MeasureAggregators aggregators) {
+            this.aggregators = aggregators;
+        }
+
+        @Override
+        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> values, Collector<Tuple2<ByteArray, Object[]>> out) throws Exception {
+            Object[] result = null;
+            ByteArray key = null;
+
+            for (Tuple2<ByteArray, Object[]> item : values) {
+                key = item.f0;
+                if (result == null) {
+                    result = item.f1;
+                } else {
+                    Object[] temp = new Object[result.length];
+                    aggregators.aggregate(item.f1, result, temp);
+                    result = temp;
+                }
+            }
+            out.collect(new Tuple2<>(key, result));
+        }
+    }
+
+    private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>> {
 
         private BufferedMeasureCodec codec;
         private SerializableConfiguration sConf;
@@ -309,11 +339,11 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         }
 
         @Override
-        public Tuple2<Text, Text> map(Tuple2<Text, Object[]> tuple2) throws Exception {
+        public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
             ByteBuffer valueBuf = codec.encode(tuple2.f1);
             Text result = new Text();
             result.set(valueBuf.array(), 0, valueBuf.position());
-            return new Tuple2<>(tuple2.f0, result);
+            return new Tuple2<>(new Text(tuple2.f0.array()), result);
         }
 
     }