You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/30 09:41:41 UTC

[kylin] branch master updated: KYLIN-3597 Close JavaSparkContext after used.

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new f859e43  KYLIN-3597 Close JavaSparkContext after used.
f859e43 is described below

commit f859e4363f4fea381ee65538f76c37882750a531
Author: Lijun Cao <64...@qq.com>
AuthorDate: Sat Sep 29 11:46:19 2018 +0800

    KYLIN-3597 Close JavaSparkContext after used.
---
 .../kylin/engine/spark/SparkCubingMerge.java       | 201 +++++++++++----------
 .../kylin/engine/spark/SparkFactDistinct.java      |  96 +++++-----
 .../kylin/engine/spark/SparkMergingDictionary.java |  42 +++--
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 173 +++++++++---------
 4 files changed, 259 insertions(+), 253 deletions(-)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 0b03f70..5037647 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -111,127 +111,128 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
-
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-
-        final Job job = Job.getInstance(sConf.get());
-
-        SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
-
-        final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
-        final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
-            @Override
-            public Object[] call(Object[] input1, Object[] input2) throws Exception {
-                Object[] measureObjs = new Object[input1.length];
-                aggregators.aggregate(input1, input2, measureObjs);
-                return measureObjs;
-            }
-        };
-
-        final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-            private transient volatile boolean initialized = false;
-            BufferedMeasureCodec codec;
-
-            @Override
-            public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
-                    throws Exception {
-
-                if (initialized == false) {
-                    synchronized (SparkCubingMerge.class) {
-                        if (initialized == false) {
-                            synchronized (SparkCubingMerge.class) {
-                                if (initialized == false) {
-                                    KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
-                                    try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
-                                            .setAndUnsetThreadLocalConfig(kylinConfig)) {
-                                        CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
-                                        codec = new BufferedMeasureCodec(desc.getMeasures());
-                                        initialized = true;
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
+
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+
+            final Job job = Job.getInstance(sConf.get());
+
+            SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
+
+            final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+            final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
+                @Override
+                public Object[] call(Object[] input1, Object[] input2) throws Exception {
+                    Object[] measureObjs = new Object[input1.length];
+                    aggregators.aggregate(input1, input2, measureObjs);
+                    return measureObjs;
+                }
+            };
+
+            final PairFunction convertTextFunction = new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+                private transient volatile boolean initialized = false;
+                BufferedMeasureCodec codec;
+
+                @Override
+                public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<Text, Object[]> tuple2)
+                        throws Exception {
+
+                    if (initialized == false) {
+                        synchronized (SparkCubingMerge.class) {
+                            if (initialized == false) {
+                                synchronized (SparkCubingMerge.class) {
+                                    if (initialized == false) {
+                                        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+                                        try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig
+                                                .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                                            CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+                                            codec = new BufferedMeasureCodec(desc.getMeasures());
+                                            initialized = true;
+                                        }
                                     }
                                 }
                             }
                         }
                     }
+                    ByteBuffer valueBuf = codec.encode(tuple2._2());
+                    byte[] encodedBytes = new byte[valueBuf.position()];
+                    System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                    return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+                }
+            };
+
+            final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+            final String[] inputFolders = StringSplitter.split(inputPath, ",");
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            boolean isLegacyMode = false;
+            for (String inputFolder : inputFolders) {
+                Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
+                if (fs.exists(baseCuboidPath) == false) {
+                    // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
+                    isLegacyMode = true;
+                    break;
                 }
-                ByteBuffer valueBuf = codec.encode(tuple2._2());
-                byte[] encodedBytes = new byte[valueBuf.position()];
-                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-                return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
-            }
-        };
-
-        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
-        final String[] inputFolders = StringSplitter.split(inputPath, ",");
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        boolean isLegacyMode = false;
-        for (String inputFolder : inputFolders) {
-            Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0));
-            if (fs.exists(baseCuboidPath) == false) {
-                // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer)
-                isLegacyMode = true;
-                break;
-            }
-        }
-
-        if (isLegacyMode == true) {
-            // merge all layer's cuboid at once, this might be hard for Spark
-            List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
-            for (int i = 0; i < inputFolders.length; i++) {
-                String path = inputFolders[i];
-                JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
-                CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-                // re-encode with new dictionaries
-                JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
-                        sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
-                mergingSegs.add(newEcoddedRdd);
             }
 
-            FileOutputFormat.setOutputPath(job, new Path(outputPath));
-            sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                    .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
-                    .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
-
-        } else {
-            // merge by layer
-            for (int level = 0; level <= totalLevels; level++) {
-                List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+            if (isLegacyMode == true) {
+                // merge all layer's cuboid at once, this might be hard for Spark
+                List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length);
                 for (int i = 0; i < inputFolders.length; i++) {
                     String path = inputFolders[i];
+                    JavaPairRDD segRdd = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
                     CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
-                    final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
-                    JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
                     // re-encode with new dictionaries
                     JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
                             sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
                     mergingSegs.add(newEcoddedRdd);
                 }
 
-                final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
-                FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
-
+                FileOutputFormat.setOutputPath(job, new Path(outputPath));
                 sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
-                        .reduceByKey(reduceFunction,
-                                SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+                        .reduceByKey(reduceFunction, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig))
                         .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+            } else {
+                // merge by layer
+                for (int level = 0; level <= totalLevels; level++) {
+                    List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+                    for (int i = 0; i < inputFolders.length; i++) {
+                        String path = inputFolders[i];
+                        CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+                        final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level);
+                        JavaPairRDD<Text, Text> segRdd = sc.sequenceFile(cuboidInputPath, Text.class, Text.class);
+                        // re-encode with new dictionaries
+                        JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+                                sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+                        mergingSegs.add(newEcoddedRdd);
+                    }
+
+                    final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
+                    FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+
+                    sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+                            .reduceByKey(reduceFunction,
+                                    SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig))
+                            .mapToPair(convertTextFunction).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                }
             }
+            // output the data size to console, job engine will parse and save the metric
+            // please note: this mechanism won't work when spark.submit.deployMode=cluster
+            logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
         }
-        // output the data size to console, job engine will parse and save the metric
-        // please note: this mechanism won't work when spark.submit.deployMode=cluster
-        logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
     }
 
     static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 043f479..5cfd2d7 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -164,73 +164,75 @@ public class SparkFactDistinct extends AbstractApplication implements Serializab
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
 
-        final Job job = Job.getInstance(sConf.get());
+            final Job job = Job.getInstance(sConf.get());
 
-        final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeInstance);
+            final FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(
+                    cubeInstance);
 
-        logger.info("RDD Output path: {}", outputPath);
-        logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
-        logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
-        logger.info("counter path {}", counterPath);
+            logger.info("RDD Output path: {}", outputPath);
+            logger.info("getTotalReducerNum: {}", reducerMapping.getTotalReducerNum());
+            logger.info("getCuboidRowCounterReducerNum: {}", reducerMapping.getCuboidRowCounterReducerNum());
+            logger.info("counter path {}", counterPath);
 
-        boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
+            boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE
+                    .equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
-        // calculate source record bytes size
-        final LongAccumulator bytesWritten = sc.sc().longAccumulator();
+            // calculate source record bytes size
+            final LongAccumulator bytesWritten = sc.sc().longAccumulator();
 
-        final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
+            final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);
 
-        JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
-                new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
+            JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
+                    new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));
 
-        JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD
-                .groupByKey(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
+            JavaPairRDD<SelfDefineSortableKey, Iterable<Text>> aggredRDD = flatOutputRDD.groupByKey(
+                    new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));
 
-        JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
-                .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
+            JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
+                    .mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));
 
-        // make each reducer output to respective dir
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
-                NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
-                NullWritable.class, ArrayPrimitiveWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
-                LongWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
-                NullWritable.class, LongWritable.class);
+            // make each reducer output to respective dir
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class,
+                    NullWritable.class, Text.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class,
+                    NullWritable.class, ArrayPrimitiveWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class,
+                    LongWritable.class, BytesWritable.class);
+            MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class,
+                    NullWritable.class, LongWritable.class);
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
-        FileOutputFormat.setCompressOutput(job, false);
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setCompressOutput(job, false);
 
-        // prevent to create zero-sized default output
-        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
 
+            MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
 
-        MultipleOutputsRDD multipleOutputsRDD = MultipleOutputsRDD.rddToMultipleOutputsRDD(outputRDD);
+            multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
+            long recordCount = recordRDD.count();
+            logger.info("Map input records={}", recordCount);
+            logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
-        long recordCount = recordRDD.count();
-        logger.info("Map input records={}", recordCount);
-        logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
+            counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, String.valueOf(recordCount));
-        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, String.valueOf(bytesWritten.value()));
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
-
-        HadoopUtil.deleteHDFSMeta(metaUrl);
+            HadoopUtil.deleteHDFSMeta(metaUrl);
+        }
     }
 
     static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
index 37f957f..bbdeb85 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -124,36 +124,38 @@ public class SparkMergingDictionary extends AbstractApplication implements Seria
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        sc.sc().addSparkListener(jobListener);
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            KylinSparkJobListener jobListener = new KylinSparkJobListener();
+            sc.sc().addSparkListener(jobListener);
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
 
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
 
-        logger.info("Dictionary output path: {}", dictOutputPath);
-        logger.info("Statistics output path: {}", statOutputPath);
+            logger.info("Dictionary output path: {}", dictOutputPath);
+            logger.info("Statistics output path: {}", statOutputPath);
 
-        final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
-        final int columnLength = tblColRefs.length;
+            final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+            final int columnLength = tblColRefs.length;
 
-        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+            List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
 
-        for (int i = 0; i <= columnLength; i++) {
-            indexs.add(i);
-        }
+            for (int i = 0; i <= columnLength; i++) {
+                indexs.add(i);
+            }
 
-        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+            JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
 
-        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
-                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+            JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName,
+                    metaUrl, segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
 
-        colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+            colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class,
+                    SequenceFileOutputFormat.class);
+        }
     }
 
     public static class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index e2d43ba..96690d0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -134,112 +134,113 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        sc.sc().addSparkListener(jobListener);
-        final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
-        if (!fs.exists(partitionFilePath)) {
-            throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
-        }
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
+            sc.sc().addSparkListener(jobListener);
+            final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
+            if (!fs.exists(partitionFilePath)) {
+                throw new IllegalArgumentException("File not exist: " + partitionFilePath.toString());
+            }
 
-        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
-        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
-        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
-        final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
-        final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
+            final MeasureCodec inputCodec = new MeasureCodec(cubeDesc.getMeasures());
+            final List<KeyValueCreator> keyValueCreators = Lists.newArrayList();
 
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+                for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                    keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
+                }
             }
-        }
 
-        final int cfNum = keyValueCreators.size();
-        final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
-
-        logger.info("Input path: {}", inputPath);
-        logger.info("Output path: {}", outputPath);
-        // read partition split keys
-        List<RowKeyWritable> keys = new ArrayList<>();
-        try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
-            RowKeyWritable key = new RowKeyWritable();
-            Writable value = NullWritable.get();
-            while (reader.next(key, value)) {
-                keys.add(key);
-                logger.info(" ------- split key: {}", key);
-                key = new RowKeyWritable(); // important, new an object!
+            final int cfNum = keyValueCreators.size();
+            final boolean quickPath = (keyValueCreators.size() == 1) && keyValueCreators.get(0).isFullCopy;
+
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+            // read partition split keys
+            List<RowKeyWritable> keys = new ArrayList<>();
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
+                RowKeyWritable key = new RowKeyWritable();
+                Writable value = NullWritable.get();
+                while (reader.next(key, value)) {
+                    keys.add(key);
+                    logger.info(" ------- split key: {}", key);
+                    key = new RowKeyWritable(); // important, new an object!
+                }
             }
-        }
 
-        logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
+            logger.info("There are {} split keys, totally {} hfiles", keys.size(), (keys.size() + 1));
 
-        //HBase conf
-        logger.info("Loading HBase configuration from:{}", hbaseConfFile);
-        FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
+            //HBase conf
+            logger.info("Loading HBase configuration from:{}", hbaseConfFile);
+            FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
 
-        Configuration hbaseJobConf = new Configuration();
-        hbaseJobConf.addResource(confInput);
-        hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-        Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
+            Configuration hbaseJobConf = new Configuration();
+            hbaseJobConf.addResource(confInput);
+            hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+            Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
 
-        FileOutputFormat.setOutputPath(job, new Path(outputPath));
+            FileOutputFormat.setOutputPath(job, new Path(outputPath));
 
-        JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-        final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
-        if (quickPath) {
-            hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
-                    KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
-                            textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
-                    return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
-                }
-            });
-        } else {
-            hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                @Override
-                public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
-                        throws Exception {
-
-                    List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
-                    Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
-                    inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
-                            inputMeasures);
-
-                    for (int i = 0; i < cfNum; i++) {
-                        KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
-                        result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
-                                outputValue));
+            JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+            final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+            if (quickPath) {
+                hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+                        KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+                                textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+                        return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
                     }
+                });
+            } else {
+                hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                    @Override
+                    public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+                            throws Exception {
 
-                    return result.iterator();
-                }
-            });
-        }
+                        List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+                        Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+                        inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+                                inputMeasures);
 
-        hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
-                RowKeyWritable.RowKeyComparator.INSTANCE)
-                .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<ImmutableBytesWritable, KeyValue> call(
-                            Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
-                        return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
-                                rowKeyWritableKeyValueTuple2._2);
+                        for (int i = 0; i < cfNum; i++) {
+                            KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                            result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                    outputValue));
+                        }
+
+                        return result.iterator();
                     }
-                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                });
+            }
 
-        logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
+            hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+                    RowKeyWritable.RowKeyComparator.INSTANCE)
+                    .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                        @Override
+                        public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                                Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+                            return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                    rowKeyWritableKeyValueTuple2._2);
+                        }
+                    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
-        Map<String, String> counterMap = Maps.newHashMap();
-        counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+            logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
 
-        // save counter to hdfs
-        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+        }
     }
 
     static class HFilePartitioner extends Partitioner {