You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/18 22:40:28 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 2e909cc  PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
2e909cc is described below

commit 2e909cce32e26ccfa4981d42c651ba69b96b0126
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Thu Mar 14 23:16:14 2019 -0700

    PHOENIX-5184: HBase and Phoenix connection leaks in Indexing code path, OrphanViewTool and PhoenixConfigurationUtil
---
 .../UngroupedAggregateRegionObserver.java          |  6 +-
 .../hbase/index/write/RecoveryIndexWriter.java     | 10 +--
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    | 15 ++---
 .../apache/phoenix/mapreduce/OrphanViewTool.java   | 73 +++++++++++++---------
 .../phoenix/mapreduce/PhoenixRecordWriter.java     | 18 ++++--
 .../mapreduce/index/DirectHTableWriter.java        | 14 ++++-
 .../mapreduce/index/IndexScrutinyMapper.java       | 24 +++++--
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 55 +++++++++++-----
 .../index/PhoenixIndexImportDirectMapper.java      | 26 ++++----
 .../mapreduce/index/PhoenixIndexImportMapper.java  | 16 +++--
 .../index/PhoenixIndexPartialBuildMapper.java      | 25 +++++---
 .../mapreduce/util/PhoenixConfigurationUtil.java   | 45 ++++++-------
 12 files changed, 209 insertions(+), 118 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 5923a75..dc7567b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -834,7 +834,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
             try {
                 if (targetHTable != null) {
-                    targetHTable.close();
+                    try {
+                        targetHTable.close();
+                    } catch (IOException e) {
+                        logger.error("Closing table: " + targetHTable + " failed: ", e);
+                    }
                 }
             } finally {
                 try {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index 35f0a6d..fb96666 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -26,8 +26,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -55,15 +53,13 @@ public class RecoveryIndexWriter extends IndexWriter {
      * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected to be fully setup
      * before calling.
      * 
-     * @param committer
      * @param policy
      * @param env
+     * @param name
      * @throws IOException
-     * @throws ZooKeeperConnectionException
-     * @throws MasterNotRunningException
      */
     public RecoveryIndexWriter(IndexFailurePolicy policy, RegionCoprocessorEnvironment env, String name)
-            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
+            throws IOException {
         super(new TrackingParallelWriterIndexCommitter(), policy, env, name);
         this.admin = new HBaseAdmin(env.getConfiguration());
     }
@@ -125,7 +121,7 @@ public class RecoveryIndexWriter extends IndexWriter {
             try {
                 admin.close();
             } catch (IOException e) {
-                // closing silently
+                LOG.error("Closing the admin failed: ", e);
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 5252afb..8e18bf9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -37,19 +36,16 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
-import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -61,7 +57,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -69,7 +64,6 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
@@ -355,10 +349,11 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
             String tableName = table.getPhysicalName();
             Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
-            HTable htable = new HTable(conf,tableName);
-            LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
-            loader.doBulkLoad(tableOutputPath, htable);
-            LOG.info("Incremental load complete for table=" + tableName);
+            try(HTable htable = new HTable(conf,tableName)) {
+                LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+                loader.doBulkLoad(tableOutputPath, htable);
+                LOG.info("Incremental load complete for table=" + tableName);
+            }
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
index 713fb05..fba01a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/OrphanViewTool.java
@@ -393,27 +393,35 @@ public class OrphanViewTool extends Configured implements Tool {
         }
     }
 
-    private void gracefullyDropView(PhoenixConnection phoenixConnection, Configuration configuration,
-                          Key key) throws Exception {
-        PhoenixConnection tenantConnection;
-        if (key.getTenantId() != null) {
-            Properties tenantProps = new Properties();
-            tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
-            tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
-                    unwrap(PhoenixConnection.class);
-        } else {
-            tenantConnection = phoenixConnection;
-        }
-
-        MetaDataClient client = new MetaDataClient(tenantConnection);
-        org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
-                .create(key.getSchemaName(), key.getTableName());
+    private void gracefullyDropView(PhoenixConnection phoenixConnection,
+            Configuration configuration, Key key) throws Exception {
+        PhoenixConnection tenantConnection = null;
+        boolean newConn = false;
         try {
-            client.dropTable(
-                    new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
-        }
-        catch (TableNotFoundException e) {
-            LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+            if (key.getTenantId() != null) {
+                Properties tenantProps = new Properties();
+                tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, key.getTenantId());
+                tenantConnection = ConnectionUtil.getInputConnection(configuration, tenantProps).
+                        unwrap(PhoenixConnection.class);
+                newConn = true;
+            } else {
+                tenantConnection = phoenixConnection;
+            }
+
+            MetaDataClient client = new MetaDataClient(tenantConnection);
+            org.apache.phoenix.parse.TableName pTableName = org.apache.phoenix.parse.TableName
+                    .create(key.getSchemaName(), key.getTableName());
+            try {
+                client.dropTable(
+                        new DropTableStatement(pTableName, PTableType.VIEW, false, true, true));
+            }
+            catch (TableNotFoundException e) {
+                LOG.info("Ignoring view " + pTableName + " as it has already been dropped");
+            }
+        } finally {
+            if (newConn) {
+                tryClosingConnection(tenantConnection);
+            }
         }
     }
 
@@ -775,14 +783,7 @@ public class OrphanViewTool extends Configured implements Tool {
     }
 
     private void closeConnectionAndFiles(Connection connection) throws IOException {
-        try {
-            if (connection != null) {
-                connection.close();
-            }
-        } catch (SQLException sqle) {
-            LOG.error("Failed to close connection ", sqle.getMessage());
-            throw new RuntimeException("Failed to close connection");
-        }
+        tryClosingConnection(connection);
         for (byte i = VIEW; i < ORPHAN_TYPE_COUNT; i++) {
             if (writer[i] != null) {
                 writer[i].close();
@@ -794,6 +795,22 @@ public class OrphanViewTool extends Configured implements Tool {
     }
 
     /**
+     * Try closing a connection if it is not null
+     * @param connection connection object
+     * @throws RuntimeException if closing the connection fails
+     */
+    private void tryClosingConnection(Connection connection) {
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (SQLException sqlE) {
+            LOG.error("Failed to close connection: ", sqlE);
+            throw new RuntimeException("Failed to close connection with exception: ", sqlE);
+        }
+    }
+
+    /**
      * Examples for input arguments:
      * -c : cleans orphan views
      * -c -op /tmp/ : cleans orphan views and links, and logs their names to the files named Orphan*.txt in /tmp/
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 52f2fe3..b67ba74 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -52,10 +52,20 @@ public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<Nul
     }
     
     public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException {
-        this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
-        this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
-        final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
-        this.statement = this.conn.prepareStatement(upsertQuery);
+        Connection connection = null;
+        try {
+            connection = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore);
+            this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+            final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.statement = connection.prepareStatement(upsertQuery);
+            this.conn = connection;
+        } catch (Exception e) {
+            // Only close the connection in case of an exception, so cannot use try-with-resources
+            if (connection != null) {
+                connection.close();
+            }
+            throw e;
+        }
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 1230f25..323a98b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.mapreduce.index;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 
@@ -62,6 +63,7 @@ public class DirectHTableWriter {
             LOG.info("Created table instance for " + tableName);
         } catch (IOException e) {
             LOG.error("IOException : ", e);
+            tryClosingResourceSilently(this.table);
             throw new RuntimeException(e);
         }
     }
@@ -96,7 +98,17 @@ public class DirectHTableWriter {
         return table;
     }
 
+    private void tryClosingResourceSilently(Closeable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (IOException e) {
+                LOG.error("Closing resource: " + res + " failed with error: ", e);
+            }
+        }
+    }
+
     public void close() throws IOException {
-        table.close();
+        tryClosingResourceSilently(this.table);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 81081bf..c651077 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -149,10 +149,23 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
             LOG.info("Target table base query: " + targetTableQuery);
             md5 = MessageDigest.getInstance("MD5");
         } catch (SQLException | NoSuchAlgorithmException e) {
+            tryClosingResourceSilently(this.outputUpsertStmt);
+            tryClosingResourceSilently(this.connection);
+            tryClosingResourceSilently(this.outputConn);
             throw new RuntimeException(e);
         }
     }
 
+    private static void tryClosingResourceSilently(AutoCloseable res) {
+        if (res != null) {
+            try {
+                res.close();
+            } catch (Exception e) {
+                LOG.error("Closing resource: " + res + " failed :", e);
+            }
+        }
+    }
+
     @Override
     protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
             throws IOException, InterruptedException {
@@ -180,18 +193,21 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         super.cleanup(context);
+        tryClosingResourceSilently(this.outputUpsertStmt);
+        IOException throwException = null;
         if (connection != null) {
             try {
                 processBatch(context);
                 connection.close();
-                if (outputConn != null) {
-                    outputConn.close();
-                }
             } catch (SQLException e) {
                 LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
-                throw new IOException(e);
+                throwException = new IOException(e);
             }
         }
+        tryClosingResourceSilently(this.outputConn);
+        if (throwException != null) {
+            throw throwException;
+        }
     }
 
     private void processBatch(Context context)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a71bc27..7791472 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -268,6 +268,12 @@ public class IndexTool extends Configured implements Tool {
 
         }
 
+        void closeConnection() throws SQLException {
+            if (this.connection != null) {
+                this.connection.close();
+            }
+        }
+
         public Job getJob() throws Exception {
             if (isPartialBuild) {
                 return configureJobForPartialBuild();
@@ -516,10 +522,10 @@ public class IndexTool extends Configured implements Tool {
             final Configuration configuration = job.getConfiguration();
             final String physicalIndexTable =
                     PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-            final HTable htable = new HTable(configuration, physicalIndexTable);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+            try(final HTable htable = new HTable(configuration, physicalIndexTable)) {
+                HFileOutputFormat.configureIncrementalLoad(job, htable);
+            }
             return job;
-               
         }
         
         /**
@@ -527,9 +533,6 @@ public class IndexTool extends Configured implements Tool {
          * waits for the job completion based on runForeground parameter.
          * 
          * @param job
-         * @param outputPath
-         * @param runForeground - if true, waits for job completion, else submits and returns
-         *            immediately.
          * @return
          * @throws Exception
          */
@@ -562,6 +565,7 @@ public class IndexTool extends Configured implements Tool {
     public int run(String[] args) throws Exception {
         Connection connection = null;
         HTable htable = null;
+        JobFactory jobFactory = null;
         try {
             CommandLine cmdLine = null;
             try {
@@ -576,13 +580,14 @@ public class IndexTool extends Configured implements Tool {
                 tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
                 configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
             }
-            connection = ConnectionUtil.getInputConnection(configuration);
             schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+            try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
+                pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
+            }
             useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
@@ -609,7 +614,6 @@ public class IndexTool extends Configured implements Tool {
                 }
                 htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices()
                         .getTable(pIndexTable.getPhysicalName().getBytes());
-
                 if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
                     isLocalIndexBuild = true;
                     splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
@@ -637,7 +641,8 @@ public class IndexTool extends Configured implements Tool {
 				fs.delete(outputPath, true);
 			}
 
-			job = new JobFactory(connection, configuration, outputPath).getJob();
+            jobFactory = new JobFactory(connection, configuration, outputPath);
+            job = jobFactory.getJob();
 
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
@@ -670,16 +675,36 @@ public class IndexTool extends Configured implements Tool {
                     + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
             return -1;
         } finally {
+            boolean rethrowException = false;
             try {
                 if (connection != null) {
-                    connection.close();
+                    try {
+                        connection.close();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close connection ", e);
+                        rethrowException = true;
+                    }
                 }
                 if (htable != null) {
-                    htable.close();
+                    try {
+                        htable.close();
+                    } catch (IOException e) {
+                        LOG.error("Failed to close htable ", e);
+                        rethrowException = true;
+                    }
+                }
+                if (jobFactory != null) {
+                    try {
+                        jobFactory.closeConnection();
+                    } catch (SQLException e) {
+                        LOG.error("Failed to close jobFactory ", e);
+                        rethrowException = true;
+                    }
+                }
+            } finally {
+                if (rethrowException) {
+                    throw new RuntimeException("Failed to close resource");
                 }
-            } catch (SQLException sqle) {
-                LOG.error("Failed to close connection ", sqle.getMessage());
-                throw new RuntimeException("Failed to close connection");
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index e2ac491..e148f67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -105,7 +105,8 @@ public class PhoenixIndexImportDirectMapper extends
             final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
             this.pStatement = connection.prepareStatement(upsertQuery);
 
-        } catch (SQLException e) {
+        } catch (Exception e) {
+            tryClosingResources();
             throw new RuntimeException(e);
         }
     }
@@ -179,17 +180,20 @@ public class PhoenixIndexImportDirectMapper extends
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 353de7a..88ddc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -94,6 +94,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
             this.pStatement = connection.prepareStatement(upsertQuery);
             
         } catch (SQLException e) {
+            tryClosingConnection();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -162,14 +163,17 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
 
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
-         super.cleanup(context);
-         if (connection != null) {
-             try {
+        super.cleanup(context);
+        tryClosingConnection();
+    }
+
+    private void tryClosingConnection() {
+        if (connection != null) {
+            try {
                 connection.close();
             } catch (SQLException e) {
-                LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
-         }
+        }
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index f4ecac2..2077137 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -96,6 +96,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
             this.mutations = Lists.newArrayListWithExpectedSize(batchSize);
             maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration));
         } catch (SQLException e) {
+            tryClosingResources();
             throw new RuntimeException(e.getMessage());
         } 
     }
@@ -170,17 +171,21 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
             context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
             throw new RuntimeException(e);
         } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",
-                        e.getMessage());
-                }
-            }
-            if (writer != null) {
-                writer.close();
+            tryClosingResources();
+        }
+    }
+
+    private void tryClosingResources() throws IOException {
+        if (this.connection != null) {
+            try {
+                this.connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e);
             }
         }
+        if (this.writer != null) {
+            this.writer.close();
+        }
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 1cac3db..1721d42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -305,18 +305,20 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getOutputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = ConnectionUtil.getOutputConnection(configuration);
-        List<String> upsertColumnList = PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
-        if(!upsertColumnList.isEmpty()) {
-            LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s, upsertColumnList=%s "
-                    ,!upsertColumnList.isEmpty(), upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)
-                    ));
-        } 
-       columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
-       // we put the encoded column infos in the Configuration for re usability.
-       ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); 
-       connection.close();
-       return columnMetadataList;
+        try (final Connection connection = ConnectionUtil.getOutputConnection(configuration)) {
+            List<String> upsertColumnList =
+                    PhoenixConfigurationUtil.getUpsertColumnNames(configuration);
+            if(!upsertColumnList.isEmpty()) {
+                LOG.info(String.format("UseUpsertColumns=%s, upsertColumnList.size()=%s,"
+                                + " upsertColumnList=%s ",!upsertColumnList.isEmpty(),
+                        upsertColumnList.size(), Joiner.on(",").join(upsertColumnList)));
+            }
+            columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName,
+                    upsertColumnList);
+            // we put the encoded column infos in the Configuration for re usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+        }
+		return columnMetadataList;
     }
     
      public static String getUpsertStatement(final Configuration configuration) throws SQLException {
@@ -363,12 +365,13 @@ public final class PhoenixConfigurationUtil {
         if (tenantId != null) {
             props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
-        final Connection connection = ConnectionUtil.getInputConnection(configuration, props);
-        final List<String> selectColumnList = getSelectColumnList(configuration);
-        columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
-        // we put the encoded column infos in the Configuration for re usability.
-        ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
-        connection.close();
+        try (final Connection connection = ConnectionUtil.getInputConnection(configuration, props)){
+            final List<String> selectColumnList = getSelectColumnList(configuration);
+            columnMetadataList =
+                    PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+            // we put the encoded column infos in the Configuration for re usability.
+            ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
+        }
         return columnMetadataList;
     }
 
@@ -403,9 +406,9 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
         if(batchSize <= 0) {
-           Connection conn = ConnectionUtil.getOutputConnection(configuration);
-           batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
-           conn.close();
+           try (Connection conn = ConnectionUtil.getOutputConnection(configuration)) {
+               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+           }
         }
         configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
         return batchSize;