You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/09 01:54:04 UTC

[kylin] branch master updated: KYLIN-3597 Close resources after they are used.

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ee93848  KYLIN-3597 Close resources after they are used.
ee93848 is described below

commit ee93848cd7e5c0cbadb0d3701a7c92172b3546f2
Author: Lijun Cao <64...@qq.com>
AuthorDate: Mon Oct 8 16:42:05 2018 +0800

    KYLIN-3597 Close resources after they are used.
---
 .../kylin/common/persistence/JDBCResourceDAO.java  |  6 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 97 +++++++++++-----------
 2 files changed, 53 insertions(+), 50 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index dce0894..70a049b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -442,9 +442,11 @@ public class JDBCResourceDAO {
             }
 
             private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
-                final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
-                final ResultSet rs = ps.executeQuery();
+                PreparedStatement ps = null;
+                ResultSet rs = null;
                 try {
+                    ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
+                    rs = ps.executeQuery();
                     while (rs.next()) {
                         if (tableName.equals(rs.getString(1))) {
                             return true;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index 96690d0..fd32db5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -180,58 +180,59 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
 
             //HBase conf
             logger.info("Loading HBase configuration from:{}", hbaseConfFile);
-            FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
-
-            Configuration hbaseJobConf = new Configuration();
-            hbaseJobConf.addResource(confInput);
-            hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
-            Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
-
-            FileOutputFormat.setOutputPath(job, new Path(outputPath));
-
-            JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
-            final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
-            if (quickPath) {
-                hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                    @Override
-                    public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
-                        KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
-                                textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
-                        return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
-                    }
-                });
-            } else {
-                hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
-                    @Override
-                    public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
-                            throws Exception {
-
-                        List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
-                        Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
-                        inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
-                                inputMeasures);
-
-                        for (int i = 0; i < cfNum; i++) {
-                            KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
-                            result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
-                                    outputValue));
-                        }
 
-                        return result.iterator();
-                    }
-                });
-            }
+            try (FSDataInputStream confInput = fs.open(new Path(hbaseConfFile))) {
+                Configuration hbaseJobConf = new Configuration();
+                hbaseJobConf.addResource(confInput);
+                hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+                Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
+
+                FileOutputFormat.setOutputPath(job, new Path(outputPath));
 
-            hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
-                    RowKeyWritable.RowKeyComparator.INSTANCE)
-                    .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+                final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+                if (quickPath) {
+                    hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
                         @Override
-                        public Tuple2<ImmutableBytesWritable, KeyValue> call(
-                                Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
-                            return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
-                                    rowKeyWritableKeyValueTuple2._2);
+                        public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+                            KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+                                    textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+                            return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
                         }
-                    }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+                    });
+                } else {
+                    hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+                        @Override
+                        public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+                                throws Exception {
+
+                            List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+                            Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+                            inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+                                    inputMeasures);
+
+                            for (int i = 0; i < cfNum; i++) {
+                                KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+                                result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+                                        outputValue));
+                            }
+
+                            return result.iterator();
+                        }
+                    });
+                }
+
+                hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+                        RowKeyWritable.RowKeyComparator.INSTANCE)
+                        .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+                            @Override
+                            public Tuple2<ImmutableBytesWritable, KeyValue> call(
+                                    Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+                                return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+                                        rowKeyWritableKeyValueTuple2._2);
+                            }
+                        }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+            }
 
             logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());