You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/18 22:38:58 UTC
[phoenix] branch master updated: PHOENIX-5184: HBase and Phoenix
connection leaks in Indexing code path,
OrphanViewTool and PhoenixConfigurationUtil
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new f256004 PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
f256004 is described below
commit f256004ab45217eb736fda97b3d67cc77b183735
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Fri Mar 8 15:31:17 2019 -0800
PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
---
.../UngroupedAggregateRegionObserver.java | 22 +++-
.../hbase/index/write/RecoveryIndexWriter.java | 30 ++++--
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 114 +++++++++++----------
.../apache/phoenix/mapreduce/OrphanViewTool.java | 73 ++++++++-----
.../phoenix/mapreduce/PhoenixRecordWriter.java | 18 +++-
.../mapreduce/index/DirectHTableWriter.java | 19 +++-
.../mapreduce/index/IndexScrutinyMapper.java | 25 ++++-
.../apache/phoenix/mapreduce/index/IndexTool.java | 85 +++++++++++----
.../index/PhoenixIndexImportDirectMapper.java | 26 +++--
.../mapreduce/index/PhoenixIndexImportMapper.java | 16 +--
.../index/PhoenixIndexPartialBuildMapper.java | 25 +++--
.../mapreduce/util/PhoenixConfigurationUtil.java | 45 ++++----
12 files changed, 325 insertions(+), 173 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3be4d36..6b27a88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -29,6 +29,7 @@ import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CON
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -475,13 +477,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
byte[] deleteCF = null;
byte[] emptyCF = null;
Table targetHTable = null;
+ Connection targetHConn = null;
boolean isPKChanging = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
if (upsertSelectTable != null) {
isUpsert = true;
projectedTable = deserializeTable(upsertSelectTable);
- targetHTable =
- ConnectionFactory.createConnection(upsertSelectConfig).getTable(
+ targetHConn = ConnectionFactory.createConnection(upsertSelectConfig);
+ targetHTable = targetHConn.getTable(
TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
@@ -852,9 +855,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
try {
- if (targetHTable != null) {
- targetHTable.close();
- }
+ tryClosingResourceSilently(targetHTable);
+ tryClosingResourceSilently(targetHConn);
} finally {
try {
innerScanner.close();
@@ -900,6 +902,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
+ private static void tryClosingResourceSilently(Closeable res) {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (IOException e) {
+ logger.error("Closing resource: " + res + " failed: ", e);
+ }
+ }
+ }
+
private void checkForLocalIndexColumnFamilies(Region region,
List<IndexMaintainer> indexMaintainers) throws IOException {
TableDescriptor tableDesc = region.getTableDescriptor();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index a1a917c..fefb812 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -26,10 +26,9 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,17 +56,25 @@ public class RecoveryIndexWriter extends IndexWriter {
* Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
* before calling.
*
- * @param committer
* @param policy
* @param env
+ * @param name
* @throws IOException
- * @throws ZooKeeperConnectionException
- * @throws MasterNotRunningException
*/
public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
- throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+ throws IOException {
super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
- this.admin = ConnectionFactory.createConnection(env.getConfiguration()).getAdmin();
+ Connection hConn = null;
+ try {
+ hConn = ConnectionFactory.createConnection(env.getConfiguration());
+ this.admin = hConn.getAdmin();
+ } catch (Exception e) {
+ // Close the connection only if an exception occurs
+ if (hConn != null) {
+ hConn.close();
+ }
+ throw e;
+ }
}
@Override
@@ -124,10 +131,17 @@ public class RecoveryIndexWriter extends IndexWriter {
public void stop(String why) {
super.stop(why);
if (admin != null) {
+ if (admin.getConnection() != null) {
+ try {
+ admin.getConnection().close();
+ } catch (IOException e) {
+ LOG.error("Closing the connection failed: ", e);
+ }
+ }
try {
admin.close();
} catch (IOException e) {
- // closing silently
+ LOG.error("Closing the admin failed: ", e);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index e321361..cc6feb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -291,57 +291,64 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(FormatToKeyValueReducer.class);
byte[][] splitKeysBeforeJob = null;
- org.apache.hadoop.hbase.client.Connection hbaseConn =
- ConnectionFactory.createConnection(job.getConfiguration());
- RegionLocator regionLocator = null;
- if(hasLocalIndexes) {
- try{
- regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
- splitKeysBeforeJob = regionLocator.getStartKeys();
- } finally {
- if(regionLocator != null )regionLocator.close();
- }
- }
- MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
-
- final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
-
- job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
-
- // give subclasses their hook
- setupJob(job);
-
- LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
- boolean success = job.waitForCompletion(true);
-
- if (success) {
- if (hasLocalIndexes) {
- try {
- regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
- if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
- LOG.error("The table "
- + qualifiedTableName
- + " has local indexes and there is split key mismatch before and"
- + " after running bulkload job. Please rerun the job otherwise"
- + " there may be inconsistencies between actual data and index data.");
- return -1;
- }
+ try(org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(job.getConfiguration())) {
+ RegionLocator regionLocator = null;
+ if(hasLocalIndexes) {
+ try{
+ regionLocator = hbaseConn.getRegionLocator(
+ TableName.valueOf(qualifiedTableName));
+ splitKeysBeforeJob = regionLocator.getStartKeys();
} finally {
if (regionLocator != null) regionLocator.close();
}
}
- LOG.info("Loading HFiles from {}", outputPath);
- completebulkload(conf,outputPath,tablesToBeLoaded);
- LOG.info("Removing output directory {}", outputPath);
- if(!outputPath.getFileSystem(conf).delete(outputPath, true)) {
- LOG.error("Failed to delete the output directory {}", outputPath);
- }
- return 0;
- } else {
- return -1;
- }
+ MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+
+ final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON
+ .apply(tablesToBeLoaded);
+ final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON
+ .apply(tablesToBeLoaded);
+
+ job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,
+ tableNamesAsJson);
+ job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,
+ logicalNamesAsJson);
+
+ // give subclasses their hook
+ setupJob(job);
+
+ LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+ boolean success = job.waitForCompletion(true);
+
+ if (success) {
+ if (hasLocalIndexes) {
+ try {
+ regionLocator = hbaseConn.getRegionLocator(
+ TableName.valueOf(qualifiedTableName));
+ if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob,
+ regionLocator.getStartKeys())) {
+ LOG.error("The table " + qualifiedTableName + " has local indexes and"
+ + " there is split key mismatch before and after running"
+ + " bulkload job. Please rerun the job otherwise there may be"
+ + " inconsistencies between actual data and index data.");
+ return -1;
+ }
+ } finally {
+ if (regionLocator != null) regionLocator.close();
+ }
+ }
+ LOG.info("Loading HFiles from {}", outputPath);
+ completebulkload(conf,outputPath,tablesToBeLoaded);
+ LOG.info("Removing output directory {}", outputPath);
+ if(!outputPath.getFileSystem(conf).delete(outputPath, true)) {
+ LOG.error("Failed to delete the output directory {}", outputPath);
+ }
+ return 0;
+ } else {
+ return -1;
+ }
+ }
}
private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
@@ -354,11 +361,14 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String tableName = table.getPhysicalName();
Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
- org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf);
- Table htable = hbaseConn.getTable(TableName.valueOf(tableName));
- LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
- loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
- LOG.info("Incremental load complete for table=" + tableName);
+ try(org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(conf);
+ Table htable = hbaseConn.getTable(TableName.valueOf(tableName))) {
+ LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+ loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable,
+ hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
+ LOG.info("Incremental load complete for table=" + tableName);
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 713fb05..fba01a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -393,27 +393,35 @@ public class OrphanViewTool extends Configured implements Tool {
}
}
- private void gracefullyDropView(PhoenixConnection phoenixConnection, Configuration configuration,
- Key key) throws Exception {
- PhoenixConnection tenantConnection;
- if (key.getTenantId() != null) {
- Properties tenantProps = new Properties();
- tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
- tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
- unwrap(PhoenixConnection.class);
- } else {
- tenantConnection = phoenixConnection;
- }
-
- MetaDataClient client = new MetaDataClient(tenantConnection);
- org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
- .create(key.getSchemaName(), key.getTableName());
+ private void gracefullyDropView(PhoenixConnection phoenixConnection,
+ Configuration configuration, Key key) throws Exception {
+ PhoenixConnection tenantConnection = null;
+ boolean newConn = false;
try {
- client.dropTable(
- new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
- }
- catch (TableNotFoundException e) {
- LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+ if (key.getTenantId() != null) {
+ Properties tenantProps = new Properties();
+ tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
+ tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
+ unwrap(PhoenixConnection.class);
+ newConn = true;
+ } else {
+ tenantConnection = phoenixConnection;
+ }
+
+ MetaDataClient client = new MetaDataClient(tenantConnection);
+ org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
+ .create(key.getSchemaName(), key.getTableName());
+ try {
+ client.dropTable(
+ new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
+ }
+ catch (TableNotFoundException e) {
+ LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+ }
+ } finally {
+ if (newConn) {
+ tryClosingConnection(tenantConnection);
+ }
}
}
@@ -775,14 +783,7 @@ public class OrphanViewTool extends Configured implements Tool {
}
private void closeConnectionAndFiles(Connection connection) throws IOException {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (SQLException sqle) {
- LOG.error("Failed to close connection ", sqle.getMessage());
- throw new RuntimeException("Failed to close connection");
- }
+ tryClosingConnection(connection);
for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) {
if (writer[i] != null) {
writer[i].close();
@@ -794,6 +795,22 @@ public class OrphanViewTool extends Configured implements Tool {
}
/**
+ * Try closing a connection if it is not null
+ * @param connection connection object
+ * @throws RuntimeException if closing the connection fails
+ */
+ private void tryClosingConnection(Connection connection) {
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException sqlE) {
+ LOG.error("Failed to close connection: ", sqlE);
+ throw new RuntimeException("Failed to close connection with exception: ", sqlE);
+ }
+ }
+
+ /**
* Examples for input arguments:
* -c : cleans orphan views
* -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 52f2fe3..b67ba74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul
}
public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
- this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
- this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
- final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
- this.statement = this.conn.prepareStatement(upsertQuery);
+ Connection connection = null;
+ try {
+ connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
+ this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+ final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.statement = connection.prepareStatement(upsertQuery);
+ this.conn = connection;
+ } catch (Exception e) {
+ // Only close the connection in case of an exception, so cannot use try-with-resources
+ if (connection != null) {
+ connection.close();
+ }
+ throw e;
+ }
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 6100b20..b85a049 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.mapreduce.index;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -44,8 +45,8 @@ public class DirectHTableWriter {
private static final Logger LOG = LoggerFactory.getLogger(DirectHTableWriter.class);
private Configuration conf = null;
-
private Table table;
+ private Connection conn;
public DirectHTableWriter(Configuration otherConf) {
setConf(otherConf);
@@ -60,11 +61,12 @@ public class DirectHTableWriter {
}
try {
- Connection conn = ConnectionFactory.createConnection(this.conf);
+ this.conn = ConnectionFactory.createConnection(this.conf);
this.table = conn.getTable(TableName.valueOf(tableName));
LOG.info("Created table instance for " + tableName);
} catch (IOException e) {
LOG.error("IOException : ", e);
+ tryClosingResourceSilently(this.conn);
throw new RuntimeException(e);
}
}
@@ -99,7 +101,18 @@ public class DirectHTableWriter {
return table;
}
+ private void tryClosingResourceSilently(Closeable res) {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (IOException e) {
+ LOG.error("Closing resource: " + res + " failed with error: ", e);
+ }
+ }
+ }
+
public void close() throws IOException {
- table.close();
+ tryClosingResourceSilently(this.table);
+ tryClosingResourceSilently(this.conn);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index c424787..98d6bac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import com.google.common.base.Strings;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
@@ -151,10 +150,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
LOG.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
} catch (SQLException | NoSuchAlgorithmException e) {
+ tryClosingResourceSilently(this.outputUpsertStmt);
+ tryClosingResourceSilently(this.connection);
+ tryClosingResourceSilently(this.outputConn);
throw new RuntimeException(e);
}
}
+ private static void tryClosingResourceSilently(AutoCloseable res) {
+ if (res != null) {
+ try {
+ res.close();
+ } catch (Exception e) {
+ LOG.error("Closing resource: " + res + " failed :", e);
+ }
+ }
+ }
+
@Override
protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
throws IOException, InterruptedException {
@@ -182,18 +194,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
+ tryClosingResourceSilently(this.outputUpsertStmt);
+ IOException throwException = null;
if (connection != null) {
try {
processBatch(context);
connection.close();
- if (outputConn != null) {
- outputConn.close();
- }
} catch (SQLException e) {
LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
- throw new IOException(e);
+ throwException = new IOException(e);
}
}
+ tryClosingResourceSilently(this.outputConn);
+ if (throwException != null) {
+ throw throwException;
+ }
}
private void processBatch(Context context)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index ee2ae0b..bd1a310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -270,6 +270,12 @@ public class IndexTool extends Configured implements Tool {
}
+ void closeConnection() throws SQLException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
+
public Job getJob() throws Exception {
if (isPartialBuild) {
return configureJobForPartialBuild();
@@ -518,11 +524,13 @@ public class IndexTool extends Configured implements Tool {
final Configuration configuration = job.getConfiguration();
final String physicalIndexTable =
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
- org.apache.hadoop.hbase.client.Connection conn = ConnectionFactory.createConnection(configuration);
- TableName tablename = TableName.valueOf(physicalIndexTable);
- HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename),conn.getRegionLocator(tablename));
+ try(org.apache.hadoop.hbase.client.Connection conn =
+ ConnectionFactory.createConnection(configuration)) {
+ TableName tablename = TableName.valueOf(physicalIndexTable);
+ HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename),
+ conn.getRegionLocator(tablename));
+ }
return job;
-
}
/**
@@ -566,6 +574,8 @@ public class IndexTool extends Configured implements Tool {
Connection connection = null;
Table htable = null;
RegionLocator regionLocator = null;
+ JobFactory jobFactory = null;
+ org.apache.hadoop.hbase.client.Connection hConn = null;
try {
CommandLine cmdLine = null;
try {
@@ -580,13 +590,14 @@ public class IndexTool extends Configured implements Tool {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
- connection = ConnectionUtil.getInputConnection(configuration);
schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+ try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
+ pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
+ }
useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -612,8 +623,8 @@ public class IndexTool extends Configured implements Tool {
}
htable = connection.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(pIndexTable.getPhysicalName().getBytes());
- regionLocator =
- ConnectionFactory.createConnection(configuration).getRegionLocator(
+ hConn = ConnectionFactory.createConnection(configuration);
+ regionLocator = hConn.getRegionLocator(
TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
isLocalIndexBuild = true;
@@ -641,7 +652,8 @@ public class IndexTool extends Configured implements Tool {
fs.delete(outputPath, true);
}
- job = new JobFactory(connection, configuration, outputPath).getJob();
+ jobFactory = new JobFactory(connection, configuration, outputPath);
+ job = jobFactory.getJob();
if (!isForeground && useDirectApi) {
LOG.info("Running Index Build in Background - Submit async and exit");
@@ -675,19 +687,52 @@ public class IndexTool extends Configured implements Tool {
+ ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
return -1;
} finally {
+ boolean rethrowException = false;
try {
if (connection != null) {
- connection.close();
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Failed to close connection ", e);
+ rethrowException = true;
+ }
}
if (htable != null) {
- htable.close();
+ try {
+ htable.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close htable ", e);
+ rethrowException = true;
+ }
+ }
+ if (hConn != null) {
+ try {
+ hConn.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close hconnection ", e);
+ rethrowException = true;
+ }
+ }
+ if (regionLocator != null) {
+ try {
+ regionLocator.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close regionLocator ", e);
+ rethrowException = true;
+ }
+ }
+ if (jobFactory != null) {
+ try {
+ jobFactory.closeConnection();
+ } catch (SQLException e) {
+ LOG.error("Failed to close jobFactory ", e);
+ rethrowException = true;
+ }
}
- if(regionLocator != null) {
- regionLocator.close();
+ } finally {
+ if (rethrowException) {
+ throw new RuntimeException("Failed to close resource");
}
- } catch (SQLException sqle) {
- LOG.error("Failed to close connection ", sqle.getMessage());
- throw new RuntimeException("Failed to close connection");
}
}
}
@@ -695,11 +740,11 @@ public class IndexTool extends Configured implements Tool {
private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
int numRegions;
-
- try (RegionLocator regionLocator =
- ConnectionFactory.createConnection(configuration).getRegionLocator(
- TableName.valueOf(qDataTable))) {
+ try (org.apache.hadoop.hbase.client.Connection tempHConn =
+ ConnectionFactory.createConnection(configuration);
+ RegionLocator regionLocator =
+ tempHConn.getRegionLocator(TableName.valueOf(qDataTable))) {
numRegions = regionLocator.getStartKeys().length;
if (autosplit && !(numRegions > autosplitNumRegions)) {
LOG.info(String.format(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index e2ac491..e148f67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -105,7 +105,8 @@ public class PhoenixIndexImportDirectMapper extends
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.pStatement = connection.prepareStatement(upsertQuery);
- } catch (SQLException e) {
+ } catch (Exception e) {
+ tryClosingResources();
throw new RuntimeException(e);
}
}
@@ -179,17 +180,20 @@ public class PhoenixIndexImportDirectMapper extends
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
} finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
- }
- }
- if (writer != null) {
- writer.close();
+ tryClosingResources();
+ }
+ }
+
+ private void tryClosingResources() throws IOException {
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (SQLException e) {
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
}
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 8318adf..5253bfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -94,6 +94,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
this.pStatement = connection.prepareStatement(upsertQuery);
} catch (SQLException e) {
+ tryClosingConnection();
throw new RuntimeException(e.getMessage());
}
}
@@ -162,14 +163,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
- super.cleanup(context);
- if (connection != null) {
- try {
+ super.cleanup(context);
+ tryClosingConnection();
+ }
+
+ private void tryClosingConnection() {
+ if (connection != null) {
+ try {
connection.close();
} catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
- }
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 67ec62b..c79359d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
} catch (SQLException e) {
+ tryClosingResources();
throw new RuntimeException(e.getMessage());
}
}
@@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
throw new RuntimeException(e);
} finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
- e.getMessage());
- }
- }
- if (writer != null) {
- writer.close();
+ tryClosingResources();
+ }
+ }
+
+ private void tryClosingResources() throws IOException {
+ if (this.connection != null) {
+ try {
+ this.connection.close();
+ } catch (SQLException e) {
+ LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
}
}
+ if (this.writer != null) {
+ this.writer.close();
+ }
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9d8e12e..f09cf0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -329,18 +329,20 @@ public final class PhoenixConfigurationUtil {
}
final String tableName = getOutputTableName(configuration);
Preconditions.checkNotNull(tableName);
- final Connection connection = ConnectionUtil.getOutputConnection(configuration);
- List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
- if(!upsertColumnList.isEmpty()) {
- LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s "
- ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)
- ));
- }
- columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
- // we put the encoded column infos in the Configuration for re usability.
- ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
- connection.close();
- return columnMetadataList;
+ try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) {
+ List<String> upsertColumnList =
+ PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
+ if(!upsertColumnList.isEmpty()) {
+ LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s,"
+ + " upsertColumnList=%s ",!upsertColumnList.isEmpty(),
+ upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)));
+ }
+ columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName,
+ upsertColumnList);
+ // we put the encoded column infos in the Configuration for re usability.
+ ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+ }
+ return columnMetadataList;
}
public static String getUpsertStatement(final Configuration configuration) throws SQLException {
@@ -387,12 +389,13 @@ public final class PhoenixConfigurationUtil {
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
- final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
- final List<String> selectColumnList = getSelectColumnList(configuration);
- columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
- // we put the encoded column infos in the Configuration for re usability.
- ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
- connection.close();
+ try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){
+ final List<String> selectColumnList = getSelectColumnList(configuration);
+ columnMetadataList =
+ PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+ // we put the encoded column infos in the Configuration for re usability.
+ ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+ }
return columnMetadataList;
}
@@ -428,9 +431,9 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
if(batchSize <= 0) {
- Connection conn = ConnectionUtil.getOutputConnection(configuration);
- batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
- conn.close();
+ try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) {
+ batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+ }
}
configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
return batchSize;