You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/18 22:40:28 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5184: HBase and
Phoenix connection leaks in Indexing code path,
OrphanViewTool and PhoenixConfigurationUtil
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 2e909cc PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
2e909cc is described below
commit 2e909cce32e26ccfa4981d42c651ba69b96b0126
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Thu Mar 14 23:16:14 2019 -0700
PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
---
.../UngroupedAggregateRegionObserver.java | 6 +-
.../hbase/index/write/RecoveryIndexWriter.java | 10 +--
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 15 ++---
.../apache/phoenix/mapreduce/OrphanViewTool.java | 73 +++++++++++++---------
.../phoenix/mapreduce/PhoenixRecordWriter.java | 18 ++++--
.../mapreduce/index/DirectHTableWriter.java | 14 ++++-
.../mapreduce/index/IndexScrutinyMapper.java | 24 +++++--
.../apache/phoenix/mapreduce/index/IndexTool.java | 55 +++++++++++-----
.../index/PhoenixIndexImportDirectMapper.java | 26 ++++----
.../mapreduce/index/PhoenixIndexImportMapper.java | 16 +++--
.../index/PhoenixIndexPartialBuildMapper.java | 25 +++++---
.../mapreduce/util/PhoenixConfigurationUtil.java | 45 ++++++-------
12 files changed, 209 insertions(+), 118 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 5923a75..dc7567b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -834,7 +834,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
try {
if (targetHTable != null) {
- targetHTable.close();
+ try {
+ targetHTable.close();
+ } catch (IOException e) {
+ logger.error("Closing table: " + targetHTable + " failed: ", e);
+ }
}
} finally {
try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index 35f0a6d..fb96666 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -26,8 +26,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -55,15 +53,13 @@ public class RecoveryIndexWriter extends IndexWriter {
* Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
* before calling.
*
- * @param committer
* @param policy
* @param env
+ * @param name
* @throws IOException
- * @throws ZooKeeperConnectionException
- * @throws MasterNotRunningException
*/
public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
- throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ throws IOException {
super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
this.admin = new HBaseAdmin(env.getConfiguration());
}
@@ -125,7 +121,7 @@ public class RecoveryIndexWriter extends IndexWriter {
try {
admin.close();
} catch (IOException e) {
- // closing silently
+ LOG.error("Closing the admin failed: ", e);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 5252afb..8e18bf9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -37,19 +36,16 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
-import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -61,7 +57,6 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -69,7 +64,6 @@ import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
@@ -355,10 +349,11 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String tableName = table.getPhysicalName();
Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
- HTable htable = new HTable(conf,tableName);
- LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
- loader.doBulkLoad(tableOutputPath, htable);
- LOG.info("Incremental load complete for table=" + tableName);
+ try(HTable htable = new HTable(conf,tableName)) {
+ LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+ loader.doBulkLoad(tableOutputPath, htable);
+ LOG.info("Incremental load complete for table=" + tableName);
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 713fb05..fba01a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -393,27 +393,35 @@ public class OrphanViewTool extends Configured implements Tool {
}
}
- private void gracefullyDropView(PhoenixConnection phoenixConnection, Configuration configuration,
- Key key) throws Exception {
- PhoenixConnection tenantConnection;
- if (key.getTenantId() != null) {
- Properties tenantProps = new Properties();
- tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
- tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
- unwrap(PhoenixConnection.class);
- } else {
- tenantConnection = phoenixConnection;
- }
-
- MetaDataClient client = new MetaDataClient(tenantConnection);
- org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
- .create(key.getSchemaName(), key.getTableName());
+ private void gracefullyDropView(PhoenixConnection phoenixConnection,
+ Configuration configuration, Key key) throws Exception {
+ PhoenixConnection tenantConnection = null;
+ boolean newConn = false;
try {
- client.dropTable(
- new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
- }
- catch (TableNotFoundException e) {
- LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+ if (key.getTenantId() != null) {
+ Properties tenantProps = new Properties();
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
+ tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
+ unwrap(PhoenixConnection.class);
+ newConn = true;
+ } else {
+ tenantConnection = phoenixConnection;
+ }
+
+ MetaDataClient client = new MetaDataClient(tenantConnection);
+ org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
+ .create(key.getSchemaName(), key.getTableName());
+ try {
+ client.dropTable(
+ new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
+ }
+ catch (TableNotFoundException e) {
+ LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+ }
+ } finally {
+ if (newConn) {
+ tryClosingConnection(tenantConnection);
+ }
}
}
@@ -775,14 +783,7 @@ public class OrphanViewTool extends Configured implements Tool {
}
private void closeConnectionAndFiles(Connection connection) throws IOException {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (SQLException sqle) {
- LOG.error("Failed to close connection ", sqle.getMessage());
- throw new RuntimeException("Failed to close connection");
- }
+ tryClosingConnection(connection);
for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) {
if (writer[i] != null) {
writer[i].close();
@@ -794,6 +795,22 @@ public class OrphanViewTool extends Configured implements Tool {
}
/**
+ * Try closing a connection if it is not null
+ * @param connection connection object
+ * @throws RuntimeException if closing the connection fails
+ */
+ private void tryClosingConnection(Connection connection) {
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.error("Failed to close connection: ", sqlE);
+ throw new RuntimeException("Failed to close connection with exception: ", sqlE);
+ }
+ }
+
+ /**
* Examples for input arguments:
* -c : cleans orphan views
* -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 52f2fe3..b67ba74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul
}
public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
- this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
- this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
- final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
- this.statement = this.conn.prepareStatement(upsertQuery);
+ Connection connection = null;
+ try {
+ connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
+ this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+ final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.statement = connection.prepareStatement(upsertQuery);
+ this.conn = connection;
+ } catch (Exception e) {
+ // Only close the connection in case of an exception, so cannot use try-with-resources
+ if (connection != null) {
+ connection.close();
+ }
+ throw e;
+ }
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 1230f25..323a98b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.mapreduce.index;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -62,6 +63,7 @@ public class DirectHTableWriter {
LOG.info("Created table instance for " + tableName);
} catch (IOException e) {
LOG.error("IOException : ", e);
+ tryClosingResourceSilently(this.table);
throw new RuntimeException(e);
}
}
@@ -96,7 +98,17 @@ public class DirectHTableWriter {
return table;
}
+ private void tryClosingResourceSilently(Closeable res) {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (IOException e) {
+ LOG.error("Closing resource: " + res + " failed with error: ", e);
+ }
+ }
+ }
+
public void close() throws IOException {
- table.close();
+ tryClosingResourceSilently(this.table);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 81081bf..c651077 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -149,10 +149,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
LOG.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
} catch (SQLException | NoSuchAlgorithmException e) {
+ tryClosingResourceSilently(this.outputUpsertStmt);
+ tryClosingResourceSilently(this.connection);
+ tryClosingResourceSilently(this.outputConn);
throw new RuntimeException(e);
}
}
+ private static void tryClosingResourceSilently(AutoCloseable res) {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (Exception e) {
+ LOG.error("Closing resource: " + res + " failed :", e);
+ }
+ }
+ }
+
@Override
protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
throws IOException, InterruptedException {
@@ -180,18 +193,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
+ tryClosingResourceSilently(this.outputUpsertStmt);
+ IOException throwException = null;
if (connection != null) {
try {
processBatch(context);
connection.close();
- if (outputConn != null) {
- outputConn.close();
- }
} catch (SQLException e) {
LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
- throw new IOException(e);
+ throwException = new IOException(e);
}
}
+ tryClosingResourceSilently(this.outputConn);
+ if (throwException != null) {
+ throw throwException;
+ }
}
private void processBatch(Context context)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a71bc27..7791472 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -268,6 +268,12 @@ public class IndexTool extends Configured implements Tool {
}
+ void closeConnection() throws SQLException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
+
public Job getJob() throws Exception {
if (isPartialBuild) {
return configureJobForPartialBuild();
@@ -516,10 +522,10 @@ public class IndexTool extends Configured implements Tool {
final Configuration configuration = job.getConfiguration();
final String physicalIndexTable =
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
- final HTable htable = new HTable(configuration, physicalIndexTable);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
+ try(final HTable htable = new HTable(configuration, physicalIndexTable)) {
+ HFileOutputFormat.configureIncrementalLoad(job, htable);
+ }
return job;
-
}
/**
@@ -527,9 +533,6 @@ public class IndexTool extends Configured implements Tool {
* waits for the job completion based on runForeground parameter.
*
* @param job
- * @param outputPath
- * @param runForeground - if true, waits for job completion, else submits and returns
- * immediately.
* @return
* @throws Exception
*/
@@ -562,6 +565,7 @@ public class IndexTool extends Configured implements Tool {
public int run(String[] args) throws Exception {
Connection connection = null;
HTable htable = null;
+ JobFactory jobFactory = null;
try {
CommandLine cmdLine = null;
try {
@@ -576,13 +580,14 @@ public class IndexTool extends Configured implements Tool {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
- connection = ConnectionUtil.getInputConnection(configuration);
schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+ try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
+ pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
+ }
useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -609,7 +614,6 @@ public class IndexTool extends Configured implements Tool {
}
htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(pIndexTable.getPhysicalName().getBytes());
-
if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
isLocalIndexBuild = true;
splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
@@ -637,7 +641,8 @@ public class IndexTool extends Configured implements Tool {
fs.delete(outputPath, true);
}
- job = new JobFactory(connection, configuration, outputPath).getJob();
+ jobFactory = new JobFactory(connection, configuration, outputPath);
+ job = jobFactory.getJob();
if (!isForeground && useDirectApi) {
LOG.info("Running Index Build in Background - Submit async and exit");
@@ -670,16 +675,36 @@ public class IndexTool extends Configured implements Tool {
+ ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
return -1;
} finally {
+ boolean rethrowException = false;
try {
if (connection != null) {
- connection.close();
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Failed to close connection ", e);
+ rethrowException = true;
+ }
}
if (htable != null) {
- htable.close();
+ try {
+ htable.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close htable ", e);
+ rethrowException = true;
+ }
+ }
+ if (jobFactory != null) {
+ try {
+ jobFactory.closeConnection();
+ } catch (SQLException e) {
+ LOG.error("Failed to close jobFactory ", e);
+ rethrowException = true;
+ }
+ }
+ } finally {
+ if (rethrowException) {
+ throw new RuntimeException("Failed to close resource");
}
- } catch (SQLException sqle) {
- LOG.error("Failed to close connection ", sqle.getMessage());
- throw new RuntimeException("Failed to close connection");
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index e2ac491..e148f67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -105,7 +105,8 @@ public class PhoenixIndexImportDirectMapper extends
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.pStatement = connection.prepareStatement(upsertQuery);
- } catch (SQLException e) {
+ } catch (Exception e) {
+ tryClosingResources();
throw new RuntimeException(e);
}
}
@@ -179,17 +180,20 @@ public class PhoenixIndexImportDirectMapper extends
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
} finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
- }
- }
- if (writer != null) {
- writer.close();
+ tryClosingResources();
+ }
+ }
+
+ private void tryClosingResources() throws IOException {
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (SQLException e) {
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
}
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 353de7a..88ddc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -94,6 +94,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
this.pStatement = connection.prepareStatement(upsertQuery);
} catch (SQLException e) {
+ tryClosingConnection();
throw new RuntimeException(e.getMessage());
}
}
@@ -162,14 +163,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- if (connection != null) {
- try {
+ super.cleanup(context);
+ tryClosingConnection();
+ }
+
+ private void tryClosingConnection() {
+ if (connection != null) {
+ try {
connection.close();
} catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
- }
+ }
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index f4ecac2..2077137 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
} catch (SQLException e) {
+ tryClosingResources();
throw new RuntimeException(e.getMessage());
}
}
@@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
throw new RuntimeException(e);
} finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
- }
- }
- if (writer != null) {
- writer.close();
+ tryClosingResources();
+ }
+ }
+
+ private void tryClosingResources() throws IOException {
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (SQLException e) {
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
}
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 1cac3db..1721d42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -305,18 +305,20 @@ public final class PhoenixConfigurationUtil {
}
final String tableName = getOutputTableName(configuration);
Preconditions.checkNotNull(tableName);
- final Connection connection = ConnectionUtil.getOutputConnection(configuration);
- List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
- if(!upsertColumnList.isEmpty()) {
- LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s "
- ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)
- ));
- }
- columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
- // we put the encoded column infos in the Configuration for re usability.
- ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
- connection.close();
- return columnMetadataList;
+ try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) {
+ List<String> upsertColumnList =
+ PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
+ if(!upsertColumnList.isEmpty()) {
+ LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s,"
+ + " upsertColumnList=%s ",!upsertColumnList.isEmpty(),
+ upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)));
+ }
+ columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName,
+ upsertColumnList);
+ // we put the encoded column infos in the Configuration for re usability.
+ ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+ }
+ return columnMetadataList;
}
public static String getUpsertStatement(final Configuration configuration) throws SQLException {
@@ -363,12 +365,13 @@ public final class PhoenixConfigurationUtil {
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
- final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
- final List<String> selectColumnList = getSelectColumnList(configuration);
- columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
- // we put the encoded column infos in the Configuration for re usability.
- ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
- connection.close();
+ try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){
+ final List<String> selectColumnList = getSelectColumnList(configuration);
+ columnMetadataList =
+ PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+ // we put the encoded column infos in the Configuration for re usability.
+ ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+ }
return columnMetadataList;
}
@@ -403,9 +406,9 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
if(batchSize <= 0) {
- Connection conn = ConnectionUtil.getOutputConnection(configuration);
- batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
- conn.close();
+ try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) {
+ batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+ }
}
configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
return batchSize;