You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/09/30 09:41:37 UTC

[GitHub] shaofengshi closed pull request #278: KYLIN-3597 Close JavaSparkContext after used.

shaofengshi closed pull request #278: KYLIN-3597 Close JavaSparkContext after used.
URL: https://github.com/apache/kylin/pull/278
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 0b03f7077d..5037647a6f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -111,127 +111,128 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
-
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-
-        final Job job = Job.getInstance(sConf.get());
-
-        SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
-
-        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
-        final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
-            @Override
-            public Object[] call(Object[] input1, Object[] input2) throws Exception {
-                Object[] measureObjs = new Object[input1.length];
-                aggregators.aggregate(input1, input2, measureObjs);
-                return measureObjs;
-            }
-        };
-
-        final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-            private transient volatile boolean initialized = false;
-            BufferedMeasureCodec codec;
-
-            @Override
-            public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
-                    throws Exception {
-
-                if (initialized == false) {
-                    synchronized (SparkCubingMerge.class) {
-                        if (initialized == false) {
-                            synchronized (SparkCubingMerge.class) {
-                                if (initialized == false) {
-                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
-                                            .setAndUnsetThreadLocalConfig(kylinConfig)) {
-                                        CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                        codec = new BufferedMeasureCodec(desc.getMeasures());
-                                        initialized = true;
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
+
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+
+            final Job job = Job.getInstance(sConf.get());
+
+            SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+            final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+            final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
+                @Override
+                public Object[] call(Object[] input1, Object[] input2) throws Exception {
+                    Object[] measureObjs = new Object[input1.length];
+                    aggregators.aggregate(input1, input2, measureObjs);
+                    return measureObjs;
+                }
+            };
+
+            final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+                private transient volatile boolean initialized = false;
+                BufferedMeasureCodec codec;
+
+                @Override
+                public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+                        throws Exception {
+
+                    if (initialized == false) {
+                        synchronized (SparkCubingMerge.class) {
+                            if (initialized == false) {
+                                synchronized (SparkCubingMerge.class) {
+                                    if (initialized == false) {
+                                        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                                                .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                                            CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                            codec = new BufferedMeasureCodec(desc.getMeasures());
+                                            initialized = true;
+                                        }
                                     }
                                 }
                             }
                         }
                     }
+                    ByteBuffer valueBuf = codec.encode(tuple2._2());
+                    byte[] encodedBytes = new byte[valueBuf.position()];
+                    System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                    return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+                }
+            };
+
+            final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+            final String[] inputFolders = StringSplitter.split(inputPath, ",");
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            boolean isLegacyMode = false;
+            for (String inputFolder : inputFolders) {
+                Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+                if (fs.exists(baseCuboidPath) == false) {
+                    // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+                    isLegacyMode = true;
+                    break;
                 }
-                ByteBuffer valueBuf = codec.encode(tuple2._2());
-                byte[] encodedBytes = new byte[valueBuf.position()];
-                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-                return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
-            }
-        };
-
-        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
-        final String[] inputFolders = StringSplitter.split(inputPath, ",");
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        boolean isLegacyMode = false;
-        for (String inputFolder : inputFolders) {
-            Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
-            if (fs.exists(baseCuboidPath) == false) {
-                // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
-                isLegacyMode = true;
-                break;
-            }
-        }
-
-        if (isLegacyMode == true) {
-            // merge all layer's cuboid at once, this might be hard for Spark
-            List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
-            for (int i = 0; i < inputFolders.length; i++) {
-                String path = inputFolders[i];
-                JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
-                CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-                // re-encode with new dictionaries
-                JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
-                        sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
-                mergingSegs.add(newEcoddedRdd);
             }
 
-            FileOutputFormat.setOutputPath(job, new Path(outputPath));
-            sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                    .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
-                    .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
-
-        } else {
-            // merge by layer
-            for (int level = 0; level <= totalLevels; level++) {
-                List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+            if (isLegacyMode == true) {
+                // merge all layer's cuboid at once, this might be hard for Spark
+                List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
                 for (int i = 0; i < inputFolders.length; i++) {
                     String path = inputFolders[i];
+                    JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
                     CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-                    final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
-                    JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
                     // re-encode with new dictionaries
                     JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
                             sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                     mergingSegs.add(newEcoddedRdd);
                 }
 
-                final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
-                FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
-
+                FileOutputFormat.setOutputPath(job, new Path(outputPath));
                 sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                        .reduceByKey(reduceFunction,
-                                SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+                        .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
                         .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            } else {
+                // merge by layer
+                for (int level = 0; level <= totalLevels; level++) {
+                    List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+                    for (int i = 0; i < inputFolders.length; i++) {
+                        String path = inputFolders[i];
+                        CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                        final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                        JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+                        // re-encode with new dictionaries
+                        JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                                sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                        mergingSegs.add(newEcoddedRdd);
+                    }
+
+                    final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+                    FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+                    sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                            .reduceByKey(reduceFunction,
+                                    SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+                            .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                }
             }
+            // output the data size to console, job engine will parse and save the metric
+            // please note: this mechanism won't work when spark.submit.deployMode=cluster
+            logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
         }
-        // output the data size to console, job engine will parse and save the metric
-        // please note: this mechanism won't work when spark.submit.deployMode=cluster
-        logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
     }
 
     static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479e7d..5cfd2d7ccb 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
 
-        final Job job = Job.getInstance(sConf.get());
+            final Job job = Job.getInstance(sConf.get());
 
-        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+            final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(
+                    cubeInstance);
 
-        logger.info("RDD Output path: {}", outputPath);
-        logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
-        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
-        logger.info("counter path {}", counterPath);
+            logger.info("RDD Output path: {}", outputPath);
+            logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+            logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+            logger.info("counter path {}", counterPath);
 
-        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+            boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+                    .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        // calculate source record bytes size
-        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+            // calculate source record bytes size
+            final LongAccumulator bytesWritten = sc.sc().longAccumulator();
 
-        final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+            final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
 
-        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
-                new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+            JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
+                    new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
 
-        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD
-                .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+            JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(
+                    new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
 
-        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
-                .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+            JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
+                    .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
 
-        // make each reducer output to respective dir
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
-                NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
-                NullWritable.class, ArrayPrimitiveWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
-                LongWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
-                NullWritable.class, LongWritable.class);
+            // make each reducer output to respective dir
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                    NullWritable.class, Text.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                    NullWritable.class, ArrayPrimitiveWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                    LongWritable.class, BytesWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                    NullWritable.class, LongWritable.class);
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
-        FileOutputFormat.setCompressOutput(job, false);
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setCompressOutput(job, false);
 
-        // prevent to create zero-sized default output
-        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
 
+            MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
 
-        MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+            multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+            long recordCount = recordRDD.count();
+            logger.info("Map input records={}", recordCount);
+            logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
-        long recordCount = recordRDD.count();
-        logger.info("Map input records={}", recordCount);
-        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
-
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+            HadoopUtil.deleteHDFSMeta(metaUrl);
+        }
     }
 
     static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957fe31..bbdeb85ecc 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
 
-        logger.info("Dictionary output path: {}", dictOutputPath);
-        logger.info("Statistics output path: {}", statOutputPath);
+            logger.info("Dictionary output path: {}", dictOutputPath);
+            logger.info("Statistics output path: {}", statOutputPath);
 
-        final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
-        final int columnLength = tblColRefs.length;
+            final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+            final int columnLength = tblColRefs.length;
 
-        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+            List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
 
-        for (int i = 0; i <= columnLength; i++) {
-            indexs.add(i);
-        }
+            for (int i = 0; i <= columnLength; i++) {
+                indexs.add(i);
+            }
 
-        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+            JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
 
-        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
-                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+            JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName,
+                    metaUrl, segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
 
-        colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+            colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+                    SequenceFileOutputFormat.class);
+        }
     }
 
     public static class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index e2d43ba1c2..96690d00de 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -134,112 +134,113 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
-        if (!fs.exists(partitionFilePath)) {
-            throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
-        }
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+            if (!fs.exists(partitionFilePath)) {
+                throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
+            }
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
-        final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
-        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+            final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+            final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
 
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+                }
             }
-        }
 
-        final int cfNum = keyValueCreators.size();
-        final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-        // read partition split keys
-        List<RowKeyWritable> keys = new ArrayList<>();
-        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
-            RowKeyWritable key = new RowKeyWritable();
-            Writable value = NullWritable.get();
-            while (reader.next(key, value)) {
-                keys.add(key);
-                logger.info(" ------- split key: {}", key);
-                key = new RowKeyWritable(); // important, new an object!
+            final int cfNum = keyValueCreators.size();
+            final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+            // read partition split keys
+            List<RowKeyWritable> keys = new ArrayList<>();
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
+                RowKeyWritable key = new RowKeyWritable();
+                Writable value = NullWritable.get();
+                while (reader.next(key, value)) {
+                    keys.add(key);
+                    logger.info(" ------- split key: {}", key);
+                    key = new RowKeyWritable(); // important, new an object!
+                }
             }
-        }
 
-        logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
+            logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
 
-        //HBase conf
-        logger.info("Loading HBase configuration from:{}", hbaseConfFile);
-        FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+            //HBase conf
+            logger.info("Loading HBase configuration from:{}", hbaseConfFile);
+            FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
 
-        Configuration hbaseJobConf = new Configuration();
-        hbaseJobConf.addResource(confInput);
-        hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-        Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
+            Configuration hbaseJobConf = new Configuration();
+            hbaseJobConf.addResource(confInput);
+            hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+            Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
 
-        JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
-        if (quickPath) {
-            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
-                    KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
-                            textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
-                    return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
-                }
-            });
-        } else {
-            hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
-                        throws Exception {
-
-                    List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
-                    Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
-                    inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
-                            inputMeasures);
-
-                    for (int i = 0; i < cfNum; i++) {
-                        KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
-                        result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
-                                outputValue));
+            JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+            final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+            if (quickPath) {
+                hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+                        KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+                                textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+                        return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
                     }
+                });
+            } else {
+                hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+                            throws Exception {
 
-                    return result.iterator();
-                }
-            });
-        }
+                        List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+                        Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+                        inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+                                inputMeasures);
 
-        hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
-                RowKeyWritable.RowKeyComparator.INSTANCE)
-                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
-                            Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
-                        return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
-                                rowKeyWritableKeyValueTuple2._2);
+                        for (int i = 0; i < cfNum; i++) {
+                            KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                            result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                    outputValue));
+                        }
+
+                        return result.iterator();
                     }
-                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                });
+            }
 
-        logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
+            hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+                    RowKeyWritable.RowKeyComparator.INSTANCE)
+                    .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                        @Override
+                        public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                                Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+                            return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                    rowKeyWritableKeyValueTuple2._2);
+                        }
+                    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+            logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+        }
     }
 
     static class HFilePartitioner extends Partitioner {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services