You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/09 01:54:04 UTC
[kylin] branch master updated: KYLIN-3597 Close resources after
they are used.
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new ee93848 KYLIN-3597 Close resources after they are used.
ee93848 is described below
commit ee93848cd7e5c0cbadb0d3701a7c92172b3546f2
Author: Lijun Cao <64...@qq.com>
AuthorDate: Mon Oct 8 16:42:05 2018 +0800
KYLIN-3597 Close resources after they are used.
---
.../kylin/common/persistence/JDBCResourceDAO.java | 6 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 97 +++++++++++-----------
2 files changed, 53 insertions(+), 50 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index dce0894..70a049b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -442,9 +442,11 @@ public class JDBCResourceDAO {
}
private boolean checkTableExists(final String tableName, final Connection connection) throws SQLException {
- final PreparedStatement ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
- final ResultSet rs = ps.executeQuery();
+ PreparedStatement ps = null;
+ ResultSet rs = null;
try {
+ ps = connection.prepareStatement(getCheckTableExistsSql(tableName));
+ rs = ps.executeQuery();
while (rs.next()) {
if (tableName.equals(rs.getString(1))) {
return true;
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index 96690d0..fd32db5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -180,58 +180,59 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
//HBase conf
logger.info("Loading HBase configuration from:{}", hbaseConfFile);
- FSDataInputStream confInput = fs.open(new Path(hbaseConfFile));
-
- Configuration hbaseJobConf = new Configuration();
- hbaseJobConf.addResource(confInput);
- hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
- Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
-
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
-
- JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
- final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
- if (quickPath) {
- hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
- KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
- textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
- return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
- }
- });
- } else {
- hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
- throws Exception {
-
- List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
- Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
- inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
- inputMeasures);
-
- for (int i = 0; i < cfNum; i++) {
- KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
- result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
- outputValue));
- }
- return result.iterator();
- }
- });
- }
+ try (FSDataInputStream confInput = fs.open(new Path(hbaseConfFile))) {
+ Configuration hbaseJobConf = new Configuration();
+ hbaseJobConf.addResource(confInput);
+ hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
+ Job job = Job.getInstance(hbaseJobConf, cubeSegment.getStorageLocationIdentifier());
+
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
- hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
- RowKeyWritable.RowKeyComparator.INSTANCE)
- .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+ final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+ if (quickPath) {
+ hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
@Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(
- Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
- return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
- rowKeyWritableKeyValueTuple2._2);
+ public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+ textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+ return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
}
- }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ });
+ } else {
+ hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+ throws Exception {
+
+ List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+ Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+ inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+ inputMeasures);
+
+ for (int i = 0; i < cfNum; i++) {
+ KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+ result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+ outputValue));
+ }
+
+ return result.iterator();
+ }
+ });
+ }
+
+ hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+ RowKeyWritable.RowKeyComparator.INSTANCE)
+ .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(
+ Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+ return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+ rowKeyWritableKeyValueTuple2._2);
+ }
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+ }
logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());