You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/09/29 05:29:48 UTC

[GitHub] caolijun1166 closed pull request #273: clearKylin 3597

caolijun1166 closed pull request #273: clearKylin 3597
URL: https://github.com/apache/kylin/pull/273
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479e7d..5cfd2d7ccb 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
 
-        final Job job = Job.getInstance(sConf.get());
+            final Job job = Job.getInstance(sConf.get());
 
-        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+            final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(
+                    cubeInstance);
 
-        logger.info("RDD Output path: {}", outputPath);
-        logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
-        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
-        logger.info("counter path {}", counterPath);
+            logger.info("RDD Output path: {}", outputPath);
+            logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+            logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+            logger.info("counter path {}", counterPath);
 
-        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+            boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+                    .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        // calculate source record bytes size
-        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+            // calculate source record bytes size
+            final LongAccumulator bytesWritten = sc.sc().longAccumulator();
 
-        final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+            final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
 
-        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
-                new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+            JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
+                    new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
 
-        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD
-                .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+            JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(
+                    new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
 
-        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
-                .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+            JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
+                    .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
 
-        // make each reducer output to respective dir
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
-                NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
-                NullWritable.class, ArrayPrimitiveWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
-                LongWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
-                NullWritable.class, LongWritable.class);
+            // make each reducer output to respective dir
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                    NullWritable.class, Text.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                    NullWritable.class, ArrayPrimitiveWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                    LongWritable.class, BytesWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                    NullWritable.class, LongWritable.class);
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
-        FileOutputFormat.setCompressOutput(job, false);
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setCompressOutput(job, false);
 
-        // prevent to create zero-sized default output
-        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
 
+            MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
 
-        MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+            multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+            long recordCount = recordRDD.count();
+            logger.info("Map input records={}", recordCount);
+            logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
-        long recordCount = recordRDD.count();
-        logger.info("Map input records={}", recordCount);
-        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
-
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+            HadoopUtil.deleteHDFSMeta(metaUrl);
+        }
     }
 
     static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957fe31..bbdeb85ecc 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
 
-        logger.info("Dictionary output path: {}", dictOutputPath);
-        logger.info("Statistics output path: {}", statOutputPath);
+            logger.info("Dictionary output path: {}", dictOutputPath);
+            logger.info("Statistics output path: {}", statOutputPath);
 
-        final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
-        final int columnLength = tblColRefs.length;
+            final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+            final int columnLength = tblColRefs.length;
 
-        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+            List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
 
-        for (int i = 0; i <= columnLength; i++) {
-            indexs.add(i);
-        }
+            for (int i = 0; i <= columnLength; i++) {
+                indexs.add(i);
+            }
 
-        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+            JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
 
-        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
-                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+            JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName,
+                    metaUrl, segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
 
-        colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+            colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+                    SequenceFileOutputFormat.class);
+        }
     }
 
     public static class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services