You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/03/03 09:11:53 UTC
[kylin] branch master updated: KYLIN-4387 Flink cubing merge step
failed
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 22ac013 KYLIN-4387 Flink cubing merge step failed
22ac013 is described below
commit 22ac0132dabff0241eb4a7781a451b82d907c5a7
Author: Harvey Yue <we...@roadefend.com>
AuthorDate: Mon Mar 2 12:21:15 2020 +0800
KYLIN-4387 Flink cubing merge step failed
---
.../kylin/engine/flink/FlinkCubingMerge.java | 78 +++++++++++++++-------
1 file changed, 54 insertions(+), 24 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 8ac96c1..c02cd41 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.flink;
import com.google.common.collect.Lists;
@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,14 +30,17 @@ import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
+import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.Pair;
@@ -47,7 +51,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -147,7 +150,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
FileSystem fs = HadoopUtil.getWorkingFileSystem();
boolean isLegacyMode = false;
for (String inputFolder : inputFolders) {
- Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+ Path baseCuboidPath = new Path(FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
if (fs.exists(baseCuboidPath) == false) {
// doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
isLegacyMode = true;
@@ -157,13 +160,13 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
if (isLegacyMode) {
// merge all layer's cuboid at once, this might be hard for Spark
- List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
+ List<DataSet<Tuple2<ByteArray, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
for (int i = 0; i < inputFolders.length; i++) {
String path = inputFolders[i];
DataSet segRdd = FlinkUtil.parseInputPath(path, fs, env, Text.class, Text.class);
CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
// re-encode with new dictionaries
- DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+ DataSet<Tuple2<ByteArray, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
mergingSegs.add(newEcoddedRdd);
}
@@ -173,34 +176,34 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job);
if (mergingSegs.size() > 0) {
- DataSet<Tuple2<Text, Object[]>> unionedDataSet = mergingSegs.get(0);
+ DataSet<Tuple2<ByteArray, Object[]>> unionedDataSet = mergingSegs.get(0);
for (int i = 1; i < mergingSegs.size(); i++) {
unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
}
unionedDataSet
.groupBy(0)
- .reduce(new MeasureReduceFunction(aggregators))
+ .reduceGroup(new MeasureReduceGroupFunction(aggregators))
.map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
.output(hadoopOF);
}
} else {
// merge by layer
for (int level = 0; level <= totalLevels; level++) {
- List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayList();
+ List<DataSet<Tuple2<ByteArray, Object[]>>> mergingSegs = Lists.newArrayList();
for (int i = 0; i < inputFolders.length; i++) {
String path = inputFolders[i];
CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
- final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
- DataSet<Tuple2<Text, Text>> segRdd = env.createInput(
- HadoopInputs.readSequenceFile(Text.class, Text.class, cuboidInputPath));
+ final String cuboidInputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+ DataSet<Tuple2<Text, Text>> segRdd = env.createInput(HadoopInputs.readHadoopFile(
+ new SequenceFileInputFormat(), Text.class, Text.class, cuboidInputPath));
// re-encode with new dictionaries
- DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
+ DataSet<Tuple2<ByteArray, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName,
sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
mergingSegs.add(newEcoddedRdd);
}
- final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+ final String cuboidOutputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
Job jobInstanceForEachOutputFormat = Job.getInstance();
FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress
@@ -211,14 +214,14 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), jobInstanceForEachOutputFormat);
if (mergingSegs.size() > 0) {
- DataSet unionedDataSet = mergingSegs.get(0);
+ DataSet<Tuple2<ByteArray, Object[]>> unionedDataSet = mergingSegs.get(0);
for (int i = 1; i < mergingSegs.size(); i++) {
unionedDataSet = unionedDataSet.union(mergingSegs.get(i));
}
unionedDataSet
.groupBy(0)
- .reduce(new MeasureReduceFunction(aggregators))
+ .reduceGroup(new MeasureReduceGroupFunction(aggregators))
.map(new ConvertTextMapFunction(sConf, metaUrl, cubeName))
.output(hadoopOF);
}
@@ -234,7 +237,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
return CubeInstance.findSegmentWithJobId(jobID, cube);
}
- private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<Text, Object[]>> {
+ private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<ByteArray, Object[]>> {
private String cubeName;
private String sourceSegmentId;
private String mergedSegmentId;
@@ -244,7 +247,7 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
private transient SegmentReEncoder segmentReEncoder = null;
ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
- SerializableConfiguration conf) {
+ SerializableConfiguration conf) {
this.cubeName = cubeName;
this.sourceSegmentId = sourceSegmentId;
this.mergedSegmentId = mergedSegmentId;
@@ -263,13 +266,13 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
}
@Override
- public Tuple2<Text, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ public Tuple2<ByteArray, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception {
Pair<Text, Object[]> encodedPair = segmentReEncoder.reEncode2(textTextTuple2.f0, textTextTuple2.f1);
- return new Tuple2(encodedPair.getFirst(), encodedPair.getSecond());
+ return new Tuple2(new ByteArray(encodedPair.getFirst().getBytes()), encodedPair.getSecond());
}
}
- private static class MeasureReduceFunction implements ReduceFunction<Tuple2<Text, Object[]>> {
+ private static class MeasureReduceFunction implements ReduceFunction<Tuple2<ByteArray, Object[]>> {
private MeasureAggregators aggregators;
@@ -278,14 +281,41 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
}
@Override
- public Tuple2<Text, Object[]> reduce(Tuple2<Text, Object[]> input1, Tuple2<Text, Object[]> input2) throws Exception {
+ public Tuple2<ByteArray, Object[]> reduce(Tuple2<ByteArray, Object[]> input1, Tuple2<ByteArray, Object[]> input2) throws Exception {
Object[] measureObjs = new Object[input1.f1.length];
aggregators.aggregate(input1.f1, input2.f1, measureObjs);
return new Tuple2<>(input1.f0, measureObjs);
}
}
- private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<Text, Object[]>, Tuple2<Text, Text>> {
+ private static class MeasureReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> {
+
+ private MeasureAggregators aggregators;
+
+ public MeasureReduceGroupFunction(MeasureAggregators aggregators) {
+ this.aggregators = aggregators;
+ }
+
+ @Override
+ public void reduce(Iterable<Tuple2<ByteArray, Object[]>> values, Collector<Tuple2<ByteArray, Object[]>> out) throws Exception {
+ Object[] result = null;
+ ByteArray key = null;
+
+ for (Tuple2<ByteArray, Object[]> item : values) {
+ key = item.f0;
+ if (result == null) {
+ result = item.f1;
+ } else {
+ Object[] temp = new Object[result.length];
+ aggregators.aggregate(item.f1, result, temp);
+ result = temp;
+ }
+ }
+ out.collect(new Tuple2<>(key, result));
+ }
+ }
+
+ private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<ByteArray, Object[]>, Tuple2<Text, Text>> {
private BufferedMeasureCodec codec;
private SerializableConfiguration sConf;
@@ -309,11 +339,11 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
}
@Override
- public Tuple2<Text, Text> map(Tuple2<Text, Object[]> tuple2) throws Exception {
+ public Tuple2<Text, Text> map(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
ByteBuffer valueBuf = codec.encode(tuple2.f1);
Text result = new Text();
result.set(valueBuf.array(), 0, valueBuf.position());
- return new Tuple2<>(tuple2.f0, result);
+ return new Tuple2<>(new Text(tuple2.f0.array()), result);
}
}