You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/30 09:41:41 UTC
[kylin] branch master updated: KYLIN-3597 Close JavaSparkContext
after used.
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new f859e43 KYLIN-3597 Close JavaSparkContext after used.
f859e43 is described below
commit f859e4363f4fea381ee65538f76c37882750a531
Author: Lijun Cao <64...@qq.com>
AuthorDate: Sat Sep 29 11:46:19 2018 +0800
KYLIN-3597 Close JavaSparkContext after used.
---
.../kylin/engine/spark/SparkCubingMerge.java | 201 +++++++++++----------
.../kylin/engine/spark/SparkFactDistinct.java | 96 +++++-----
.../kylin/engine/spark/SparkMergingDictionary.java | 42 +++--
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 173 +++++++++---------
4 files changed, 259 insertions(+), 253 deletions(-)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 0b03f70..5037647 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -111,127 +111,128 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
- JavaSparkContext sc = new JavaSparkContext(conf);
- SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
- KylinSparkJobListener jobListener = new KylinSparkJobListener();
- sc.sc().addSparkListener(jobListener);
-
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
- final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
- final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
-
- logger.info("Input path: {}", inputPath);
- logger.info("Output path: {}", outputPath);
-
- final Job job = Job.getInstance(sConf.get());
-
- SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
-
- final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
- final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
- @Override
- public Object[] call(Object[] input1, Object[] input2) throws Exception {
- Object[] measureObjs = new Object[input1.length];
- aggregators.aggregate(input1, input2, measureObjs);
- return measureObjs;
- }
- };
-
- final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
- private transient volatile boolean initialized = false;
- BufferedMeasureCodec codec;
-
- @Override
- public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
- throws Exception {
-
- if (initialized == false) {
- synchronized (SparkCubingMerge.class) {
- if (initialized == false) {
- synchronized (SparkCubingMerge.class) {
- if (initialized == false) {
- KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
- .setAndUnsetThreadLocalConfig(kylinConfig)) {
- CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+
+ final Job job = Job.getInstance(sConf.get());
+
+ SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+ final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+ final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws Exception {
+ Object[] measureObjs = new Object[input1.length];
+ aggregators.aggregate(input1, input2, measureObjs);
+ return measureObjs;
+ }
+ };
+
+ final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+ private transient volatile boolean initialized = false;
+ BufferedMeasureCodec codec;
+
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+ throws Exception {
+
+ if (initialized == false) {
+ synchronized (SparkCubingMerge.class) {
+ if (initialized == false) {
+ synchronized (SparkCubingMerge.class) {
+ if (initialized == false) {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+ .setAndUnsetThreadLocalConfig(kylinConfig)) {
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
}
}
}
}
}
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+ return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+ }
+ };
+
+ final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+ final String[] inputFolders = StringSplitter.split(inputPath, ",");
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ boolean isLegacyMode = false;
+ for (String inputFolder : inputFolders) {
+ Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+ if (fs.exists(baseCuboidPath) == false) {
+ // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+ isLegacyMode = true;
+ break;
}
- ByteBuffer valueBuf = codec.encode(tuple2._2());
- byte[] encodedBytes = new byte[valueBuf.position()];
- System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
- return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
- }
- };
-
- final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
- final String[] inputFolders = StringSplitter.split(inputPath, ",");
- FileSystem fs = HadoopUtil.getWorkingFileSystem();
- boolean isLegacyMode = false;
- for (String inputFolder : inputFolders) {
- Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
- if (fs.exists(baseCuboidPath) == false) {
- // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
- isLegacyMode = true;
- break;
- }
- }
-
- if (isLegacyMode == true) {
- // merge all layer's cuboid at once, this might be hard for Spark
- List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
- for (int i = 0; i < inputFolders.length; i++) {
- String path = inputFolders[i];
- JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
- CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
- // re-encode with new dictionaries
- JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
- sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
- mergingSegs.add(newEcoddedRdd);
}
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
- .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
- .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
-
- } else {
- // merge by layer
- for (int level = 0; level <= totalLevels; level++) {
- List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+ if (isLegacyMode == true) {
+ // merge all layer's cuboid at once, this might be hard for Spark
+ List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
for (int i = 0; i < inputFolders.length; i++) {
String path = inputFolders[i];
+ JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
- final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
- JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
// re-encode with new dictionaries
JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
mergingSegs.add(newEcoddedRdd);
}
- final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
- FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
-
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
- .reduceByKey(reduceFunction,
- SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+ .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
.mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ } else {
+ // merge by layer
+ for (int level = 0; level <= totalLevels; level++) {
+ List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+ for (int i = 0; i < inputFolders.length; i++) {
+ String path = inputFolders[i];
+ CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+ final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+ JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+ // re-encode with new dictionaries
+ JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+ sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+ mergingSegs.add(newEcoddedRdd);
+ }
+
+ final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+ FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+ sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+ .reduceByKey(reduceFunction,
+ SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+ .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ }
}
+ // output the data size to console, job engine will parse and save the metric
+ // please note: this mechanism won't work when spark.submit.deployMode=cluster
+ logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
}
- // output the data size to console, job engine will parse and save the metric
- // please note: this mechanism won't work when spark.submit.deployMode=cluster
- logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
}
static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479..5cfd2d7 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
- JavaSparkContext sc = new JavaSparkContext(conf);
- sc.sc().addSparkListener(jobListener);
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ sc.sc().addSparkListener(jobListener);
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
- KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
- final Job job = Job.getInstance(sConf.get());
+ final Job job = Job.getInstance(sConf.get());
- final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+ final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(
+ cubeInstance);
- logger.info("RDD Output path: {}", outputPath);
- logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
- logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
- logger.info("counter path {}", counterPath);
+ logger.info("RDD Output path: {}", outputPath);
+ logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+ logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+ logger.info("counter path {}", counterPath);
- boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+ boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+ .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
- // calculate source record bytes size
- final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+ // calculate source record bytes size
+ final LongAccumulator bytesWritten = sc.sc().longAccumulator();
- final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+ final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
- JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
- new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+ JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
+ new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
- JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD
- .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+ JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(
+ new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
- JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
- .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+ JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
+ .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
- // make each reducer output to respective dir
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
- NullWritable.class, Text.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
- NullWritable.class, ArrayPrimitiveWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
- LongWritable.class, BytesWritable.class);
- MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
- NullWritable.class, LongWritable.class);
+ // make each reducer output to respective dir
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+ NullWritable.class, Text.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+ NullWritable.class, ArrayPrimitiveWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+ LongWritable.class, BytesWritable.class);
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+ NullWritable.class, LongWritable.class);
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- FileOutputFormat.setCompressOutput(job, false);
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ FileOutputFormat.setCompressOutput(job, false);
- // prevent to create zero-sized default output
- LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+ // prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+ MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
- MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+ multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
- multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+ long recordCount = recordRDD.count();
+ logger.info("Map input records={}", recordCount);
+ logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
- long recordCount = recordRDD.count();
- logger.info("Map input records={}", recordCount);
- logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
- Map<String, String> counterMap = Maps.newHashMap();
- counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
- counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
- // save counter to hdfs
- HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
-
- HadoopUtil.deleteHDFSMeta(metaUrl);
+ HadoopUtil.deleteHDFSMeta(metaUrl);
+ }
}
static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957f..bbdeb85 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
- JavaSparkContext sc = new JavaSparkContext(conf);
- KylinSparkJobListener jobListener = new KylinSparkJobListener();
- sc.sc().addSparkListener(jobListener);
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
- final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
- logger.info("Dictionary output path: {}", dictOutputPath);
- logger.info("Statistics output path: {}", statOutputPath);
+ logger.info("Dictionary output path: {}", dictOutputPath);
+ logger.info("Statistics output path: {}", statOutputPath);
- final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
- final int columnLength = tblColRefs.length;
+ final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+ final int columnLength = tblColRefs.length;
- List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+ List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
- for (int i = 0; i <= columnLength; i++) {
- indexs.add(i);
- }
+ for (int i = 0; i <= columnLength; i++) {
+ indexs.add(i);
+ }
- JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+ JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
- JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
- segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+ JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName,
+ metaUrl, segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
- colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+ colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+ SequenceFileOutputFormat.class);
+ }
}
public static class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index e2d43ba..96690d0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -134,112 +134,113 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
KylinSparkJobListener jobListener = new KylinSparkJobListener();
- JavaSparkContext sc = new JavaSparkContext(conf);
- sc.sc().addSparkListener(jobListener);
- final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
- if (!fs.exists(partitionFilePath)) {
- throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
- }
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+ sc.sc().addSparkListener(jobListener);
+ final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+ if (!fs.exists(partitionFilePath)) {
+ throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
+ }
- HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
- final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
- final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
- final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+ final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+ final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+ }
}
- }
- final int cfNum = keyValueCreators.size();
- final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
-
- logger.info("Input path: {}", inputPath);
- logger.info("Output path: {}", outputPath);
- // read partition split keys
- List<RowKeyWritable> keys = new ArrayList<>();
- try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
- RowKeyWritable key = new RowKeyWritable();
- Writable value = NullWritable.get();
- while (reader.next(key, value)) {
- keys.add(key);
- logger.info(" ------- split key: {}", key);
- key = new RowKeyWritable(); // important, new an object!
+ final int cfNum = keyValueCreators.size();
+ final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+ // read partition split keys
+ List<RowKeyWritable> keys = new ArrayList<>();
+ try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
+ RowKeyWritable key = new RowKeyWritable();
+ Writable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ keys.add(key);
+ logger.info(" ------- split key: {}", key);
+ key = new RowKeyWritable(); // important, new an object!
+ }
}
- }
- logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
+ logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
- //HBase conf
- logger.info("Loading HBase configuration from:{}", hbaseConfFile);
- FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+ //HBase conf
+ logger.info("Loading HBase configuration from:{}", hbaseConfFile);
+ FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
- Configuration hbaseJobConf = new Configuration();
- hbaseJobConf.addResource(confInput);
- hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
- Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
+ Configuration hbaseJobConf = new Configuration();
+ hbaseJobConf.addResource(confInput);
+ hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+ Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
- JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
- final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
- if (quickPath) {
- hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
- KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
- textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
- return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
- }
- });
- } else {
- hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
- throws Exception {
-
- List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
- Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
- inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
- inputMeasures);
-
- for (int i = 0; i < cfNum; i++) {
- KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
- result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
- outputValue));
+ JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+ final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+ if (quickPath) {
+ hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+ textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+ return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
}
+ });
+ } else {
+ hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+ throws Exception {
- return result.iterator();
- }
- });
- }
+ List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+ Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+ inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+ inputMeasures);
- hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
- RowKeyWritable.RowKeyComparator.INSTANCE)
- .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
- @Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(
- Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
- return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
- rowKeyWritableKeyValueTuple2._2);
+ for (int i = 0; i < cfNum; i++) {
+ KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+ result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+ outputValue));
+ }
+
+ return result.iterator();
}
- }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ });
+ }
- logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
+ hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+ RowKeyWritable.RowKeyComparator.INSTANCE)
+ .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(
+ Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+ return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+ rowKeyWritableKeyValueTuple2._2);
+ }
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
- Map<String, String> counterMap = Maps.newHashMap();
- counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+ logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
- // save counter to hdfs
- HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+ }
}
static class HFilePartitioner extends Partitioner {