You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/18 22:38:58 UTC

[phoenix] branch master updated: PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new f256004  PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
f256004 is described below

commit f256004ab45217eb736fda97b3d67cc77b183735
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Fri Mar 8 15:31:17 2019 -0800

    PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
---
 .../UngroupedAggregateRegionObserver.java          |  22 +++-
 .../hbase/index/write/RecoveryIndexWriter.java     |  30 ++++--
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    | 114 +++++++++++----------
 .../apache/phoenix/mapreduce/OrphanViewTool.java   |  73 ++++++++-----
 .../phoenix/mapreduce/PhoenixRecordWriter.java     |  18 +++-
 .../mapreduce/index/DirectHTableWriter.java        |  19 +++-
 .../mapreduce/index/IndexScrutinyMapper.java       |  25 ++++-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  85 +++++++++++----
 .../index/PhoenixIndexImportDirectMapper.java      |  26 +++--
 .../mapreduce/index/PhoenixIndexImportMapper.java  |  16 +--
 .../index/PhoenixIndexPartialBuildMapper.java      |  25 +++--
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  45 ++++----
 12 files changed, 325 insertions(+), 173 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3be4d36..6b27a88 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -29,6 +29,7 @@ import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CON
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -475,13 +477,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] deleteCF = null;
         byte[] emptyCF = null;
         Table targetHTable = null;
+        Connection targetHConn = null;
         boolean isPKChanging = false;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
             projectedTable = deserializeTable(upsertSelectTable);
-            targetHTable =
-                    ConnectionFactory.createConnection(upsertSelectConfig).getTable(
+            targetHConn = ConnectionFactory.createConnection(upsertSelectConfig);
+            targetHTable = targetHConn.getTable(
                         TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
             selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
@@ -852,9 +855,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 }
             }
             try {
-                if (targetHTable != null) {
-                    targetHTable.close();
-                }
+                tryClosingResourceSilently(targetHTable);
+                tryClosingResourceSilently(targetHConn);
             } finally {
                 try {
                     innerScanner.close();
@@ -900,6 +902,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     }
 
+    private static void tryClosingResourceSilently(Closeable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (IOException e) {
+                logger.error("Closing resource: " + res + " failed: ", e);
+            }
+        }
+    }
+
     private void checkForLocalIndexColumnFamilies(Region region,
             List<IndexMaintainer> indexMaintainers) throws IOException {
         TableDescriptor tableDesc = region.getTableDescriptor();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index a1a917c..fefb812 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -26,10 +26,9 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,17 +56,25 @@ public class RecoveryIndexWriter extends IndexWriter {
      * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
      * before calling.
      * 
-     * @param committer
      * @param policy
      * @param env
+     * @param name
      * @throws IOException
-     * @throws ZooKeeperConnectionException
-     * @throws MasterNotRunningException
      */
     public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
-            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+            throws IOException {
         super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
-        this.admin = ConnectionFactory.createConnection(env.getConfiguration()).getAdmin();
+        Connection hConn = null;
+        try {
+            hConn = ConnectionFactory.createConnection(env.getConfiguration());
+            this.admin = hConn.getAdmin();
+        } catch (Exception e) {
+            // Close the connection only if an exception occurs
+            if (hConn != null) {
+                hConn.close();
+            }
+            throw e;
+        }
     }
 
     @Override
@@ -124,10 +131,17 @@ public class RecoveryIndexWriter extends IndexWriter {
     public void stop(String why) {
         super.stop(why);
         if (admin != null) {
+            if (admin.getConnection() != null) {
+                try {
+                    admin.getConnection().close();
+                } catch (IOException e) {
+                    LOG.error("Closing the connection failed: ", e);
+                }
+            }
             try {
                 admin.close();
             } catch (IOException e) {
-                // closing silently
+                LOG.error("Closing the admin failed: ", e);
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index e321361..cc6feb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -291,57 +291,64 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         job.setOutputValueClass(KeyValue.class);
         job.setReducerClass(FormatToKeyValueReducer.class);
         byte[][] splitKeysBeforeJob = null;
-        org.apache.hadoop.hbase.client.Connection hbaseConn =
-                ConnectionFactory.createConnection(job.getConfiguration());
-        RegionLocator regionLocator = null;
-        if(hasLocalIndexes) {
-            try{
-                regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
-                splitKeysBeforeJob = regionLocator.getStartKeys();
-            } finally {
-                if(regionLocator != null )regionLocator.close();
-            }
-        }
-        MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
-
-        final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
-        final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
-
-        job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
-        job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
-
-        // give subclasses their hook
-        setupJob(job);
-
-        LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
-        boolean success = job.waitForCompletion(true);
-
-        if (success) {
-            if (hasLocalIndexes) {
-                try {
-                    regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName));
-                    if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
-                        LOG.error("The table "
-                                + qualifiedTableName
-                                + " has local indexes and there is split key mismatch before and"
-                                + " after running bulkload job. Please rerun the job otherwise"
-                                + " there may be inconsistencies between actual data and index data.");
-                        return -1;
-                    }
+        try(org.apache.hadoop.hbase.client.Connection hbaseConn =
+                ConnectionFactory.createConnection(job.getConfiguration())) {
+            RegionLocator regionLocator = null;
+            if(hasLocalIndexes) {
+                try{
+                    regionLocator = hbaseConn.getRegionLocator(
+                            TableName.valueOf(qualifiedTableName));
+                    splitKeysBeforeJob = regionLocator.getStartKeys();
                 } finally {
                     if (regionLocator != null) regionLocator.close();
                 }
             }
-            LOG.info("Loading HFiles from {}", outputPath);
-            completebulkload(conf,outputPath,tablesToBeLoaded);
-            LOG.info("Removing output directory {}", outputPath);
-            if(!outputPath.getFileSystem(conf).delete(outputPath, true)) {
-                LOG.error("Failed to delete the output directory {}", outputPath);
-            }
-            return 0;
-        } else {
-            return -1;
-        }
+            MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+
+            final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON
+                    .apply(tablesToBeLoaded);
+            final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON
+                    .apply(tablesToBeLoaded);
+
+            job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,
+                    tableNamesAsJson);
+            job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,
+                    logicalNamesAsJson);
+
+            // give subclasses their hook
+            setupJob(job);
+
+            LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath);
+            boolean success = job.waitForCompletion(true);
+
+            if (success) {
+                if (hasLocalIndexes) {
+                    try {
+                        regionLocator = hbaseConn.getRegionLocator(
+                                TableName.valueOf(qualifiedTableName));
+                        if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob,
+                                regionLocator.getStartKeys())) {
+                            LOG.error("The table " + qualifiedTableName + " has local indexes and"
+                                    + " there is split key mismatch before and after running"
+                                    + " bulkload job. Please rerun the job otherwise there may be"
+                                    + " inconsistencies between actual data and index data.");
+                            return -1;
+                        }
+                    } finally {
+                        if (regionLocator != null) regionLocator.close();
+                    }
+                }
+                LOG.info("Loading HFiles from {}", outputPath);
+                completebulkload(conf,outputPath,tablesToBeLoaded);
+                LOG.info("Removing output directory {}", outputPath);
+                if(!outputPath.getFileSystem(conf).delete(outputPath, true)) {
+                    LOG.error("Failed to delete the output directory {}", outputPath);
+                }
+                return 0;
+            } else {
+               return -1;
+           }
+       }
     }
 
     private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
@@ -354,11 +361,14 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
             String tableName = table.getPhysicalName();
             Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
-            org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf);
-            Table htable = hbaseConn.getTable(TableName.valueOf(tableName));
-            LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
-            loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
-            LOG.info("Incremental load complete for table=" + tableName);
+            try(org.apache.hadoop.hbase.client.Connection hbaseConn =
+                    ConnectionFactory.createConnection(conf);
+                    Table htable = hbaseConn.getTable(TableName.valueOf(tableName))) {
+                LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+                loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable,
+                        hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
+                LOG.info("Incremental load complete for table=" + tableName);
+            }
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 713fb05..fba01a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -393,27 +393,35 @@ public class OrphanViewTool extends Configured implements Tool {
         }
     }
 
-    private void gracefullyDropView(PhoenixConnection phoenixConnection, Configuration configuration,
-                          Key key) throws Exception {
-        PhoenixConnection tenantConnection;
-        if (key.getTenantId() != null) {
-            Properties tenantProps = new Properties();
-            tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
-            tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
-                    unwrap(PhoenixConnection.class);
-        } else {
-            tenantConnection = phoenixConnection;
-        }
-
-        MetaDataClient client = new MetaDataClient(tenantConnection);
-        org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
-                .create(key.getSchemaName(), key.getTableName());
+    private void gracefullyDropView(PhoenixConnection phoenixConnection,
+            Configuration configuration, Key key) throws Exception {
+        PhoenixConnection tenantConnection = null;
+        boolean newConn = false;
         try {
-            client.dropTable(
-                    new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
-        }
-        catch (TableNotFoundException e) {
-            LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+            if (key.getTenantId() != null) {
+                Properties tenantProps = new Properties();
+                tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
+                tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
+                        unwrap(PhoenixConnection.class);
+                newConn = true;
+            } else {
+                tenantConnection = phoenixConnection;
+            }
+
+            MetaDataClient client = new MetaDataClient(tenantConnection);
+            org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
+                    .create(key.getSchemaName(), key.getTableName());
+            try {
+                client.dropTable(
+                        new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
+            }
+            catch (TableNotFoundException e) {
+                LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+            }
+        } finally {
+            if (newConn) {
+                tryClosingConnection(tenantConnection);
+            }
         }
     }
 
@@ -775,14 +783,7 @@ public class OrphanViewTool extends Configured implements Tool {
     }
 
     private void closeConnectionAndFiles(Connection connection) throws IOException {
-        try {
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (SQLException sqle) {
-            LOG.error("Failed to close connection ", sqle.getMessage());
-            throw new RuntimeException("Failed to close connection");
-        }
+        tryClosingConnection(connection);
         for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) {
             if (writer[i] != null) {
                 writer[i].close();
@@ -794,6 +795,22 @@ public class OrphanViewTool extends Configured implements Tool {
     }
 
     /**
+     * Try closing a connection if it is not null
+     * @param connection connection object
+     * @throws RuntimeException if closing the connection fails
+     */
+    private void tryClosingConnection(Connection connection) {
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (SQLException sqlE) {
+            LOG.error("Failed to close connection: ", sqlE);
+            throw new RuntimeException("Failed to close connection with exception: ", sqlE);
+        }
+    }
+
+    /**
      * Examples for input arguments:
      * -c : cleans orphan views
      * -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 52f2fe3..b67ba74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<Nul
     }
     
     public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
-        this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
-        this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
-        final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
-        this.statement = this.conn.prepareStatement(upsertQuery);
+        Connection connection = null;
+        try {
+            connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
+            this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+            final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.statement = connection.prepareStatement(upsertQuery);
+            this.conn = connection;
+        } catch (Exception e) {
+            // Only close the connection in case of an exception, so cannot use try-with-resources
+            if (connection != null) {
+                connection.close();
+            }
+            throw e;
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 6100b20..b85a049 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.mapreduce.index;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
@@ -44,8 +45,8 @@ public class DirectHTableWriter {
     private static final Logger LOG = LoggerFactory.getLogger(DirectHTableWriter.class);
 
     private Configuration conf = null;
-
     private Table table;
+    private Connection conn;
 
     public DirectHTableWriter(Configuration otherConf) {
         setConf(otherConf);
@@ -60,11 +61,12 @@ public class DirectHTableWriter {
         }
 
         try {
-            Connection conn = ConnectionFactory.createConnection(this.conf);
+            this.conn = ConnectionFactory.createConnection(this.conf);
             this.table = conn.getTable(TableName.valueOf(tableName));
             LOG.info("Created table instance for " + tableName);
         } catch (IOException e) {
             LOG.error("IOException : ", e);
+            tryClosingResourceSilently(this.conn);
             throw new RuntimeException(e);
         }
     }
@@ -99,7 +101,18 @@ public class DirectHTableWriter {
         return table;
     }
 
+    private void tryClosingResourceSilently(Closeable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (IOException e) {
+                LOG.error("Closing resource: " + res + " failed with error: ", e);
+            }
+        }
+    }
+
     public void close() throws IOException {
-        table.close();
+        tryClosingResourceSilently(this.table);
+        tryClosingResourceSilently(this.conn);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index c424787..98d6bac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.base.Strings;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Pair;
@@ -151,10 +150,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             LOG.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
         } catch (SQLException | NoSuchAlgorithmException e) {
+            tryClosingResourceSilently(this.outputUpsertStmt);
+            tryClosingResourceSilently(this.connection);
+            tryClosingResourceSilently(this.outputConn);
             throw new RuntimeException(e);
         }
     }
 
+    private static void tryClosingResourceSilently(AutoCloseable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (Exception e) {
+                LOG.error("Closing resource: " + res + " failed :", e);
+            }
+        }
+    }
+
     @Override
     protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
             throws IOException, InterruptedException {
@@ -182,18 +194,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         super.cleanup(context);
+        tryClosingResourceSilently(this.outputUpsertStmt);
+        IOException throwException = null;
         if (connection != null) {
             try {
                 processBatch(context);
                 connection.close();
-                if (outputConn != null) {
-                    outputConn.close();
-                }
             } catch (SQLException e) {
                 LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
-                throw new IOException(e);
+                throwException = new IOException(e);
             }
         }
+        tryClosingResourceSilently(this.outputConn);
+        if (throwException != null) {
+            throw throwException;
+        }
     }
 
     private void processBatch(Context context)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index ee2ae0b..bd1a310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -270,6 +270,12 @@ public class IndexTool extends Configured implements Tool {
 
         }
 
+        void closeConnection() throws SQLException {
+            if (this.connection != null) {
+                this.connection.close();
+            }
+        }
+
         public Job getJob() throws Exception {
             if (isPartialBuild) {
                 return configureJobForPartialBuild();
@@ -518,11 +524,13 @@ public class IndexTool extends Configured implements Tool {
             final Configuration configuration = job.getConfiguration();
             final String physicalIndexTable =
                     PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-            org.apache.hadoop.hbase.client.Connection conn = ConnectionFactory.createConnection(configuration);
-            TableName tablename = TableName.valueOf(physicalIndexTable);
-            HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename),conn.getRegionLocator(tablename));
+            try(org.apache.hadoop.hbase.client.Connection conn =
+                    ConnectionFactory.createConnection(configuration)) {
+                TableName tablename = TableName.valueOf(physicalIndexTable);
+                HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tablename),
+                        conn.getRegionLocator(tablename));
+            }
             return job;
-               
         }
         
         /**
@@ -566,6 +574,8 @@ public class IndexTool extends Configured implements Tool {
         Connection connection = null;
         Table htable = null;
         RegionLocator regionLocator = null;
+        JobFactory jobFactory = null;
+        org.apache.hadoop.hbase.client.Connection hConn = null;
         try {
             CommandLine cmdLine = null;
             try {
@@ -580,13 +590,14 @@ public class IndexTool extends Configured implements Tool {
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
-            connection = ConnectionUtil.getInputConnection(configuration);
             schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+            try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
+                pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
+            }
             useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -612,8 +623,8 @@ public class IndexTool extends Configured implements Tool {
                 }
                 htable = connection.unwrap(PhoenixConnection.class).getQueryServices()
                         .getTable(pIndexTable.getPhysicalName().getBytes());
-                regionLocator =
-                        ConnectionFactory.createConnection(configuration).getRegionLocator(
+                hConn = ConnectionFactory.createConnection(configuration);
+                regionLocator = hConn.getRegionLocator(
                             TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
                 if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
                     isLocalIndexBuild = true;
@@ -641,7 +652,8 @@ public class IndexTool extends Configured implements Tool {
 				fs.delete(outputPath, true);
 			}
 
-			job = new JobFactory(connection, configuration, outputPath).getJob();
+            jobFactory = new JobFactory(connection, configuration, outputPath);
+            job = jobFactory.getJob();
 
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
@@ -675,19 +687,52 @@ public class IndexTool extends Configured implements Tool {
                     + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
             return -1;
         } finally {
+            boolean rethrowException = false;
             try {
                 if (connection != null) {
-                    connection.close();
+                    try {
+                        connection.close();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close connection ", e);
+                        rethrowException = true;
+                    }
                 }
                 if (htable != null) {
-                    htable.close();
+                    try {
+                        htable.close();
+                    } catch (IOException e) {
+                        LOG.error("Failed to close htable ", e);
+                        rethrowException = true;
+                    }
+                }
+                if (hConn != null) {
+                    try {
+                        hConn.close();
+                    } catch (IOException e) {
+                        LOG.error("Failed to close hconnection ", e);
+                        rethrowException = true;
+                    }
+                }
+                if (regionLocator != null) {
+                    try {
+                        regionLocator.close();
+                    } catch (IOException e) {
+                        LOG.error("Failed to close regionLocator ", e);
+                        rethrowException = true;
+                    }
+                }
+                if (jobFactory != null) {
+                    try {
+                        jobFactory.closeConnection();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close jobFactory ", e);
+                        rethrowException = true;
+                    }
                 }
-                if(regionLocator != null) {
-                    regionLocator.close();
+            } finally {
+                if (rethrowException) {
+                    throw new RuntimeException("Failed to close resource");
                 }
-            } catch (SQLException sqle) {
-                LOG.error("Failed to close connection ", sqle.getMessage());
-                throw new RuntimeException("Failed to close connection");
             }
         }
     }
@@ -695,11 +740,11 @@ public class IndexTool extends Configured implements Tool {
     private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
             throws SQLException, IOException, IllegalArgumentException, InterruptedException {
         int numRegions;
-        
 
-        try (RegionLocator regionLocator =
-                ConnectionFactory.createConnection(configuration).getRegionLocator(
-                    TableName.valueOf(qDataTable))) {
+        try (org.apache.hadoop.hbase.client.Connection tempHConn =
+                ConnectionFactory.createConnection(configuration);
+                RegionLocator regionLocator =
+                        tempHConn.getRegionLocator(TableName.valueOf(qDataTable))) {
             numRegions = regionLocator.getStartKeys().length;
             if (autosplit && !(numRegions > autosplitNumRegions)) {
                 LOG.info(String.format(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index e2ac491..e148f67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -105,7 +105,8 @@ public class PhoenixIndexImportDirectMapper extends
             final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
             this.pStatement = connection.prepareStatement(upsertQuery);
 
-        } catch (SQLException e) {
+        } catch (Exception e) {
+            tryClosingResources();
             throw new RuntimeException(e);
         }
     }
@@ -179,17 +180,20 @@ public class PhoenixIndexImportDirectMapper extends
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 8318adf..5253bfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -94,6 +94,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
             this.pStatement = connection.prepareStatement(upsertQuery);
             
         } catch (SQLException e) {
+            tryClosingConnection();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -162,14 +163,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-         super.cleanup(context);
-         if (connection != null) {
-             try {
+        super.cleanup(context);
+        tryClosingConnection();
+    }
+
+    private void tryClosingConnection() {
+        if (connection != null) {
+            try {
                 connection.close();
             } catch (SQLException e) {
-                LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
-         }
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 67ec62b..c79359d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
             this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
             maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
         } catch (SQLException e) {
+            tryClosingResources();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9d8e12e..f09cf0b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -329,18 +329,20 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getOutputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = ConnectionUtil.getOutputConnection(configuration);
-        List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
-        if(!upsertColumnList.isEmpty()) {
-            LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s "
-                    ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)
-                    ));
-        } 
-       columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
-       // we put the encoded column infos in the Configuration for re usability.
-       ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); 
-       connection.close();
-       return columnMetadataList;
+        try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) {
+            List<String> upsertColumnList =
+                    PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
+            if(!upsertColumnList.isEmpty()) {
+                LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s,"
+                                + " upsertColumnList=%s ",!upsertColumnList.isEmpty(),
+                        upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)));
+            }
+            columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName,
+                    upsertColumnList);
+            // we put the encoded column infos in the Configuration for re usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+        }
+		return columnMetadataList;
     }
     
      public static String getUpsertStatement(final Configuration configuration) throws SQLException {
@@ -387,12 +389,13 @@ public final class PhoenixConfigurationUtil {
         if (tenantId != null) {
             props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
-        final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
-        final List<String> selectColumnList = getSelectColumnList(configuration);
-        columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
-        // we put the encoded column infos in the Configuration for re usability.
-        ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
-        connection.close();
+        try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){
+            final List<String> selectColumnList = getSelectColumnList(configuration);
+            columnMetadataList =
+                    PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+            // we put the encoded column infos in the Configuration for re usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+        }
         return columnMetadataList;
     }
 
@@ -428,9 +431,9 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
         if(batchSize <= 0) {
-           Connection conn = ConnectionUtil.getOutputConnection(configuration);
-           batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
-           conn.close();
+           try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) {
+               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+           }
         }
         configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
         return batchSize;