You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/03/28 05:06:13 UTC

[phoenix] branch master updated: PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new affa9e8  PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed
affa9e8 is described below

commit affa9e889efcc2ad7dac009a0d294b09447d281e
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Fri Mar 27 17:59:39 2020 -0700

    PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed
---
 .../org/apache/phoenix/end2end/UpsertSelectIT.java | 2119 +++++++++++---------
 .../compile/MutatingParallelIteratorFactory.java   |  120 +-
 .../org/apache/phoenix/compile/UpsertCompiler.java |   32 +-
 .../phoenix/iterate/BaseResultIterators.java       |    1 -
 4 files changed, 1279 insertions(+), 993 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 41e2d3c..6f0f877 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.A_VALUE;
@@ -42,6 +43,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.Properties;
 
@@ -49,6 +51,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.GlobalMetric;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PInteger;
@@ -57,6 +60,9 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -70,7 +76,22 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         this.allowServerSideMutations = allowServerSideMutations;
     }
 
-    @Parameters(name="UpsertSelecttIT_allowServerSideMutations={0}") // name is used by failsafe as file name in reports
+    @Before
+    public void setup() {
+        assertTrue(PhoenixRuntime.areGlobalClientMetricsBeingCollected());
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            m.reset();
+        }
+    }
+
+    @After
+    public void assertNoConnLeak() {
+        assertTrue(PhoenixRuntime.areGlobalClientMetricsBeingCollected());
+        assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+    }
+
+    // name is used by failsafe as file name in reports
+    @Parameters(name="UpsertSelecttIT_allowServerSideMutations={0}")
     public static synchronized Object[] data() {
         return new Object[] {"true", "false"};
     }
@@ -81,17 +102,17 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
     }
     
     @Test
-    public void testUpsertSelecWithIndex() throws Exception {
+    public void testUpsertSelectWithIndex() throws Exception {
         testUpsertSelect(true, false);
     }
     
     @Test
-    public void testUpsertSelecWithIndexWithSalt() throws Exception {
+    public void testUpsertSelectWithIndexWithSalt() throws Exception {
         testUpsertSelect(true, true);
     }
 
     @Test
-    public void testUpsertSelecWithNoIndexWithSalt() throws Exception {
+    public void testUpsertSelectWithNoIndexWithSalt() throws Exception {
         testUpsertSelect(false, true);
     }
 
@@ -101,145 +122,163 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        String aTable = initATableValues(tenantId, saltTable ? null : splits, null, null, getUrl(), saltTable ? "salt_buckets = 2" : null);
+        String aTable = initATableValues(tenantId, saltTable ? null : splits, null,
+                null, getUrl(), saltTable ? "salt_buckets = 2" : null);
 
         String customEntityTable = generateUniqueName();
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        String ddl = "create table " + customEntityTable +
-                "   (organization_id char(15) not null, \n" +
-                "    key_prefix char(3) not null,\n" +
-                "    custom_entity_data_id char(12) not null,\n" +
-                "    created_by varchar,\n" +
-                "    created_date date,\n" +
-                "    currency_iso_code char(3),\n" +
-                "    deleted char(1),\n" +
-                "    division decimal(31,10),\n" +
-                "    last_activity date,\n" +
-                "    last_update date,\n" +
-                "    last_update_by varchar,\n" +
-                "    name varchar(240),\n" +
-                "    owner varchar,\n" +
-                "    record_type_id char(15),\n" +
-                "    setup_owner varchar,\n" +
-                "    system_modstamp date,\n" +
-                "    b.val0 varchar,\n" +
-                "    b.val1 varchar,\n" +
-                "    b.val2 varchar,\n" +
-                "    b.val3 varchar,\n" +
-                "    b.val4 varchar,\n" +
-                "    b.val5 varchar,\n" +
-                "    b.val6 varchar,\n" +
-                "    b.val7 varchar,\n" +
-                "    b.val8 varchar,\n" +
-                "    b.val9 varchar\n" +
-                "    CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) " + (saltTable ? "salt_buckets = 2"  : "");
-        conn.createStatement().execute(ddl);
-        conn.close();
-        
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            String ddl = "create table " + customEntityTable +
+                    "   (organization_id char(15) not null, \n" +
+                    "    key_prefix char(3) not null,\n" +
+                    "    custom_entity_data_id char(12) not null,\n" +
+                    "    created_by varchar,\n" +
+                    "    created_date date,\n" +
+                    "    currency_iso_code char(3),\n" +
+                    "    deleted char(1),\n" +
+                    "    division decimal(31,10),\n" +
+                    "    last_activity date,\n" +
+                    "    last_update date,\n" +
+                    "    last_update_by varchar,\n" +
+                    "    name varchar(240),\n" +
+                    "    owner varchar,\n" +
+                    "    record_type_id char(15),\n" +
+                    "    setup_owner varchar,\n" +
+                    "    system_modstamp date,\n" +
+                    "    b.val0 varchar,\n" +
+                    "    b.val1 varchar,\n" +
+                    "    b.val2 varchar,\n" +
+                    "    b.val3 varchar,\n" +
+                    "    b.val4 varchar,\n" +
+                    "    b.val5 varchar,\n" +
+                    "    b.val6 varchar,\n" +
+                    "    b.val7 varchar,\n" +
+                    "    b.val8 varchar,\n" +
+                    "    b.val9 varchar\n" +
+                    "    CONSTRAINT pk PRIMARY KEY " +
+                    "(organization_id, key_prefix, custom_entity_data_id)) " +
+                    (saltTable ? "salt_buckets = 2"  : "");
+            stmt.execute(ddl);
+        }
+
         String indexName = generateUniqueName();
         if (createIndex) {
-            conn = DriverManager.getConnection(getUrl(), props);
-            conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + aTable + "(a_string)" );
-            conn.close();
-        }
-        PreparedStatement upsertStmt;
-        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3)); // Trigger multiple batches
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        String upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
-            "SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, a_string  FROM " + aTable + " WHERE ?=a_string";
-        if (createIndex) { // Confirm index is used
-            upsertStmt = conn.prepareStatement("EXPLAIN " + upsert);
-            upsertStmt.setString(1, tenantId);
-            ResultSet ers = upsertStmt.executeQuery();
-            assertTrue(ers.next());
-            String explainPlan = QueryUtil.getExplainPlan(ers);
-            assertTrue(explainPlan.contains(" SCAN OVER " + indexName));
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    Statement stmt = conn.createStatement()) {
+                stmt.execute("CREATE INDEX IF NOT EXISTS " + indexName +
+                        " ON " + aTable + "(a_string)" );
+            }
         }
-        
-        upsertStmt = conn.prepareStatement(upsert);
-        upsertStmt.setString(1, A_VALUE);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(4, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        String query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by FROM " + customEntityTable + " WHERE organization_id = ? ";
-        conn = DriverManager.getConnection(getUrl(), props);
-        PreparedStatement statement = conn.prepareStatement(query);
-        statement.setString(1, tenantId);
-        ResultSet rs = statement.executeQuery();
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("1", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("2", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("3", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("4", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
+        // Trigger multiple batches
+        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String upsert = "UPSERT INTO " + customEntityTable +
+                    "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
+                    "SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, " +
+                    "a_string  FROM " + aTable + " WHERE ?=a_string";
+            if (createIndex) { // Confirm index is used
+                try (PreparedStatement upsertStmt =
+                        conn.prepareStatement("EXPLAIN " + upsert)) {
+                    upsertStmt.setString(1, tenantId);
+                    ResultSet ers = upsertStmt.executeQuery();
+                    assertTrue(ers.next());
+                    String explainPlan = QueryUtil.getExplainPlan(ers);
+                    assertTrue(explainPlan.contains(" SCAN OVER " + indexName));
+                }
+            }
+
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                upsertStmt.setString(1, A_VALUE);
+                int rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(4, rowsInserted);
+            }
+            conn.commit();
+        }
+
+        String query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by FROM " +
+                customEntityTable + " WHERE organization_id = ? ";
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement(query)) {
+            statement.setString(1, tenantId);
+            ResultSet rs = statement.executeQuery();
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("2", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("3", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("4", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
 
-        assertFalse(rs.next());
-        conn.close();
+            assertFalse(rs.next());
+        }
 
         // Test UPSERT through coprocessor
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, last_update_by, division) " +
-            "SELECT custom_entity_data_id, key_prefix, organization_id, created_by, 1.0  FROM " + customEntityTable + " WHERE organization_id = ? and created_by >= 'a'";
-        
-        upsertStmt = conn.prepareStatement(upsert);
-        upsertStmt.setString(1, tenantId);
-        assertEquals(4, upsertStmt.executeUpdate());
-        conn.commit();
-
-        query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by, last_update_by, division FROM " + customEntityTable + " WHERE organization_id = ?";
-        conn = DriverManager.getConnection(getUrl(), props);
-        statement = conn.prepareStatement(query);
-        statement.setString(1, tenantId);
-        rs = statement.executeQuery();
-       
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("1", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        assertEquals(A_VALUE, rs.getString(4));
-        assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("2", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        assertEquals(A_VALUE, rs.getString(4));
-        assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("3", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        assertEquals(A_VALUE, rs.getString(4));
-        assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals("00A", rs.getString(1));
-        assertEquals("4", rs.getString(2));
-        assertEquals(A_VALUE, rs.getString(3));
-        assertEquals(A_VALUE, rs.getString(4));
-        assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String upsert = "UPSERT INTO " + customEntityTable +
+                    "(custom_entity_data_id, key_prefix, organization_id, last_update_by, " +
+                    "division) SELECT custom_entity_data_id, key_prefix, organization_id, " +
+                    "created_by, 1.0  FROM " + customEntityTable + " WHERE organization_id = ?" +
+                    " and created_by >= 'a'";
+
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                upsertStmt.setString(1, tenantId);
+                assertEquals(4, upsertStmt.executeUpdate());
+            }
+            conn.commit();
+        }
+
+        query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by, " +
+                "last_update_by, division FROM " + customEntityTable + " WHERE organization_id = ?";
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement(query)) {
+            statement.setString(1, tenantId);
+            ResultSet rs = statement.executeQuery();
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+            assertEquals(A_VALUE, rs.getString(4));
+            assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("2", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+            assertEquals(A_VALUE, rs.getString(4));
+            assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("3", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+            assertEquals(A_VALUE, rs.getString(4));
+            assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals("00A", rs.getString(1));
+            assertEquals("4", rs.getString(2));
+            assertEquals(A_VALUE, rs.getString(3));
+            assertEquals(A_VALUE, rs.getString(4));
+            assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+            assertFalse(rs.next());
+        }
     }
 
     // TODO: more tests - nullable fixed length last PK column
@@ -252,158 +291,165 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
-            "SELECT current_date(), x_integer+2, entity_id FROM " + aTable + " WHERE a_integer >= ?";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        upsertStmt.setInt(1, 6);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(4, rowsInserted);
-        conn.commit();
-        conn.close();
-        
+        String upsert;
+        ResultSet rs;
+        int rowsInserted;
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
+                    "SELECT current_date(), x_integer+2, entity_id FROM " + aTable +
+                    " WHERE a_integer >= ?";
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                upsertStmt.setInt(1, 6);
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(4, rowsInserted);
+            }
+            conn.commit();
+        }
+
         String query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable;
-        conn = DriverManager.getConnection(getUrl(), props);
-        PreparedStatement statement = conn.prepareStatement(query);
-        ResultSet rs = statement.executeQuery();
-        
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement statement = conn.prepareStatement(query)) {
+            rs = statement.executeQuery();
+
+            Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW6, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertEquals(null, rs.getBigDecimal(4));
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW7, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW8, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW9, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertFalse(rs.next());
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, inst) " +
+                    "SELECT \"DATE\"+1, val*10, host FROM " + ptsdbTable;
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(4, rowsInserted);
+            }
+            conn.commit();
+        }
         Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW6, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertEquals(null, rs.getBigDecimal(4));
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW7, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW8, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW9, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
-
-        assertFalse(rs.next());
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, inst) " +
-            "SELECT \"DATE\"+1, val*10, host FROM " + ptsdbTable;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(4, rowsInserted);
-        conn.commit();
-        conn.close();
-        
         Date then = new Date(now.getTime() + QueryConstants.MILLIS_IN_DAY);
         query = "SELECT host,inst, \"DATE\",val FROM " + ptsdbTable + " where inst is not null";
-        conn = DriverManager.getConnection(getUrl(), props);
-        statement = conn.prepareStatement(query);
-        
-        rs = statement.executeQuery();
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW6, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertEquals(null, rs.getBigDecimal(4));
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW7, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW8, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW9, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertFalse(rs.next());
-        conn.close();
-        
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement statement = conn.prepareStatement(query)) {
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW6, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertEquals(null, rs.getBigDecimal(4));
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW7, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW8, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW9, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertFalse(rs.next());
+        }
+
         // Should just update all values with the same value, essentially just updating the timestamp
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        upsert = "UPSERT INTO " + ptsdbTable + " SELECT * FROM " + ptsdbTable;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(8, rowsInserted);
-        conn.commit();
-        conn.close();
-        
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            upsert = "UPSERT INTO " + ptsdbTable + " SELECT * FROM " + ptsdbTable;
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(8, rowsInserted);
+            }
+            conn.commit();
+        }
+
         query = "SELECT * FROM " + ptsdbTable ;
-        conn = DriverManager.getConnection(getUrl(), props);
-        statement = conn.prepareStatement(query);
-        
-        rs = statement.executeQuery();
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW6, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertEquals(null, rs.getBigDecimal(4));
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW7, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW8, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(ROW9, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
-
-        assertTrue (rs.next());
-        assertEquals(ROW6, rs.getString(1));
-        assertEquals(null, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertEquals(null, rs.getBigDecimal(4));
-        
-        assertTrue (rs.next());
-        assertEquals(ROW7, rs.getString(1));
-        assertEquals(null, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(ROW8, rs.getString(1));
-        assertEquals(null, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(ROW9, rs.getString(1));
-        assertEquals(null, rs.getString(2));
-        assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
-        assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement(query)) {
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW6, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertEquals(null, rs.getBigDecimal(4));
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW7, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW8, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(ROW9, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(ROW6, rs.getString(1));
+            assertEquals(null, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertEquals(null, rs.getBigDecimal(4));
+
+            assertTrue (rs.next());
+            assertEquals(ROW7, rs.getString(1));
+            assertEquals(null, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(ROW8, rs.getString(1));
+            assertEquals(null, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(ROW9, rs.getString(1));
+            assertEquals(null, rs.getString(2));
+            assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+            assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertFalse(rs.next());
+        }
     }
 
     @Test
@@ -424,245 +470,276 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
-        String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
-            "SELECT current_date(), sum(a_integer), a_string FROM " + aTable + " GROUP BY a_string";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(3, rowsInserted);
-        if (!autoCommit) {
-            conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(autoCommit);
+            String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
+                    "SELECT current_date(), sum(a_integer), a_string FROM " + aTable +
+                    " GROUP BY a_string";
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                int rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(3, rowsInserted);
+            }
+            if (!autoCommit) {
+                conn.commit();
+            }
         }
-        conn.close();
         
         String query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable;
-        conn = DriverManager.getConnection(getUrl(), props);
-        PreparedStatement statement = conn.prepareStatement(query);
-        ResultSet rs = statement.executeQuery();
-        Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(A_VALUE, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(10).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(B_VALUE, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
-        
-        assertTrue (rs.next());
-        assertEquals(null, rs.getString(1));
-        assertEquals(C_VALUE, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(9).compareTo(rs.getBigDecimal(4)) == 0);
-        assertFalse(rs.next());
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host, inst) " +
-            "SELECT current_date(), max(val), max(host), 'x' FROM " + ptsdbTable;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        if (!autoCommit) {
-            conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement(query)) {
+            ResultSet rs = statement.executeQuery();
+            Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(A_VALUE, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(10).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(B_VALUE, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
+
+            assertTrue (rs.next());
+            assertEquals(null, rs.getString(1));
+            assertEquals(C_VALUE, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(9).compareTo(rs.getBigDecimal(4)) == 0);
+            assertFalse(rs.next());
         }
-        conn.close();
-        
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host, inst) " +
+                    "SELECT current_date(), max(val), max(host), 'x' FROM " + ptsdbTable;
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                int rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(1, rowsInserted);
+            }
+            if (!autoCommit) {
+                conn.commit();
+            }
+        }
+
         query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable + " WHERE inst='x'";
-        conn = DriverManager.getConnection(getUrl(), props);
-        statement = conn.prepareStatement(query);
-        rs = statement.executeQuery();
-        now = new Date(EnvironmentEdgeManager.currentTimeMillis());
-        
-        assertTrue (rs.next());
-        assertEquals("x", rs.getString(1));
-        assertEquals(C_VALUE, rs.getString(2));
-        assertTrue(rs.getDate(3).before(now) );
-        assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
-        assertFalse(rs.next());
-        
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement statement = conn.prepareStatement(query)) {
+            ResultSet rs = statement.executeQuery();
+            Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+
+            assertTrue (rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals(C_VALUE, rs.getString(2));
+            assertTrue(rs.getDate(3).before(now) );
+            assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
+            assertFalse(rs.next());
+        }
     }
 
     @Test
     public void testUpsertSelectLongToInt() throws Exception {
-        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
-                PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+                PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+                PInteger.INSTANCE.toBytes(4)};
         String tableName = generateUniqueName();
         ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null);
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
         String upsert = "UPSERT INTO " + tableName + " VALUES(1)";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        int rowsInserted;
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+            rowsInserted = upsertStmt.executeUpdate();
+            assertEquals(1, rowsInserted);
+            conn.commit();
+        }
+
         upsert = "UPSERT INTO " + tableName + "  select i+1 from " + tableName;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+            rowsInserted = upsertStmt.executeUpdate();
+            assertEquals(1, rowsInserted);
+            conn.commit();
+        }
+
         String select = "SELECT i FROM " + tableName;
-        ResultSet rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(1));
-        assertTrue(rs.next());
-        assertEquals(2,rs.getInt(1));
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(2,rs.getInt(1));
+            assertFalse(rs.next());
+        }
     }
 
     @Test
     public void testUpsertSelectRunOnServer() throws Exception {
-        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
-                PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+                PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+                PInteger.INSTANCE.toBytes(4)};
         String tableName = generateUniqueName();
-        createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null);
+        createTestTable(getUrl(), "create table " + tableName +
+                " (i integer not null primary key desc, j integer)", splits, null);
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs;
+        int rowsInserted;
         String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+            rowsInserted = upsertStmt.executeUpdate();
+            assertEquals(1, rowsInserted);
+            conn.commit();
+        }
+
         String select = "SELECT i,j+1 FROM " + tableName;
-        ResultSet rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(1));
-        assertEquals(2,rs.getInt(2));
-        assertFalse(rs.next());
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true); // Force to run on server side.
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertEquals(2,rs.getInt(2));
+            assertFalse(rs.next());
+        }
+
         upsert = "UPSERT INTO " + tableName + "(i,j) select i, j+1 from " + tableName;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true); // Force to run on server side.
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(1, rowsInserted);
+            }
+        }
+
         select = "SELECT j FROM " + tableName;
-        rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(2,rs.getInt(1));
-        assertFalse(rs.next());
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true); // Force to run on server side.
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(2,rs.getInt(1));
+            assertFalse(rs.next());
+        }
+
         upsert = "UPSERT INTO " + tableName + "(i,j) select i, i from " + tableName;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true); // Force to run on server side.
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(1, rowsInserted);
+            }
+        }
+
         select = "SELECT j FROM " + tableName;
-        rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(1));
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertFalse(rs.next());
+        }
     }
 
     @Test
     public void testUpsertSelectOnDescToAsc() throws Exception {
-        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
-                PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+                PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+                PInteger.INSTANCE.toBytes(4)};
         String tableName = generateUniqueName();
-        createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null);
+        createTestTable(getUrl(), "create table " + tableName +
+                " (i integer not null primary key desc, j integer)", splits, null);
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs;
+        int rowsInserted;
         String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true); // Force to run on server side.
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+            rowsInserted = upsertStmt.executeUpdate();
+            assertEquals(1, rowsInserted);
+            conn.commit();
+        }
+
         upsert = "UPSERT INTO " + tableName + " (i,j) select i+1, j+1 from " + tableName;
-        upsertStmt = conn.prepareStatement(upsert);
-        rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(1, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true); // Force to run on server side.
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(1, rowsInserted);
+            }
+            conn.commit();
+        }
+
         String select = "SELECT i,j FROM " + tableName;
-        ResultSet rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(2,rs.getInt(1));
-        assertEquals(2,rs.getInt(2));
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(1));
-        assertEquals(1,rs.getInt(2));
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(2,rs.getInt(1));
+            assertEquals(2,rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertEquals(1,rs.getInt(2));
+            assertFalse(rs.next());
+        }
     }
 
     @Test
     public void testUpsertSelectRowKeyMutationOnSplitedTable() throws Exception {
-        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
-                PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+        byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+                PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+                PInteger.INSTANCE.toBytes(4)};
         String tableName = generateUniqueName();
         ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null, null);
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        int rowsInserted;
+        ResultSet rs;
+
         String upsert = "UPSERT INTO " + tableName + " VALUES(?)";
-        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
-        upsertStmt.setInt(1, 1);
-        upsertStmt.executeUpdate();
-        upsertStmt.setInt(1, 3);
-        upsertStmt.executeUpdate();
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        // Normally this would force a server side update. But since this changes the PK column, it would
-        // for to run on the client side.
-        conn.setAutoCommit(true);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+            upsertStmt.setInt(1, 1);
+            upsertStmt.executeUpdate();
+            upsertStmt.setInt(1, 3);
+            upsertStmt.executeUpdate();
+            conn.commit();
+        }
+
         upsert = "UPSERT INTO " + tableName + " (i) SELECT i+1 from " + tableName;
-        upsertStmt = conn.prepareStatement(upsert);
-        int rowsInserted = upsertStmt.executeUpdate();
-        assertEquals(2, rowsInserted);
-        conn.commit();
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            // Normally this would force a server side update. But since this changes the
+            // PK column, it would for to run on the client side.
+            conn.setAutoCommit(true);
+            try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+                rowsInserted = upsertStmt.executeUpdate();
+                assertEquals(2, rowsInserted);
+            }
+            conn.commit();
+        }
+
         String select = "SELECT i FROM " + tableName;
-        ResultSet rs = conn.createStatement().executeQuery(select);
-        assertTrue(rs.next());
-        assertEquals(1,rs.getInt(1));
-        assertTrue(rs.next());
-        assertTrue(rs.next());
-        assertTrue(rs.next());
-        assertEquals(4,rs.getInt(1));
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery(select);
+            assertTrue(rs.next());
+            assertEquals(1,rs.getInt(1));
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertEquals(4,rs.getInt(1));
+            assertFalse(rs.next());
+        }
     }
     
     @Test
@@ -670,64 +747,75 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
-        conn.createStatement().execute("create table " + tableName + " (id varchar(10) not null primary key, val varchar(10), ts timestamp)");
-        conn.close();
+        ResultSet rs;
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("create table " + tableName +
+                    " (id varchar(10) not null primary key, val varchar(10), ts timestamp)");
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())");
-        conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())");
-        conn.createStatement().execute("upsert into " + tableName + " values ('ccc', 'cde', current_date())");
-        conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + tableName +
+                    " values ('aaa', 'abc', current_date())");
+            stmt.execute("upsert into " + tableName +
+                    " values ('bbb', 'bcd', current_date())");
+            stmt.execute("upsert into " + tableName +
+                    " values ('ccc', 'cde', current_date())");
+            conn.commit();
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
-        
-        assertTrue(rs.next());
-        assertEquals("aaa",rs.getString(1));
-        assertEquals("abc",rs.getString(2));
-        assertNotNull(rs.getDate(3));
-        
-        assertTrue(rs.next());
-        assertEquals("bbb",rs.getString(1));
-        assertEquals("bcd",rs.getString(2));
-        assertNotNull(rs.getDate(3));
-        
-        assertTrue(rs.next());
-        assertEquals("ccc",rs.getString(1));
-        assertEquals("cde",rs.getString(2));
-        assertNotNull(rs.getDate(3));
-        
-        assertFalse(rs.next());
-        conn.close();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery("select * from " + tableName);
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + tableName + " (id, ts) select id, CAST(null AS timestamp) from " + tableName + " where id <= 'bbb' limit 1");
-        conn.commit();
-        conn.close();
+            assertTrue(rs.next());
+            assertEquals("aaa",rs.getString(1));
+            assertEquals("abc",rs.getString(2));
+            assertNotNull(rs.getDate(3));
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        rs = conn.createStatement().executeQuery("select * from " + tableName);
-        
-        assertTrue(rs.next());
-        assertEquals("aaa",rs.getString(1));
-        assertEquals("abc",rs.getString(2));
-        assertNull(rs.getDate(3));
-        
-        assertTrue(rs.next());
-        assertEquals("bbb",rs.getString(1));
-        assertEquals("bcd",rs.getString(2));
-        assertNotNull(rs.getDate(3));
-        
-        assertTrue(rs.next());
-        assertEquals("ccc",rs.getString(1));
-        assertEquals("cde",rs.getString(2));
-        assertNotNull(rs.getDate(3));
-        
-        assertFalse(rs.next());
-        conn.close();
+            assertTrue(rs.next());
+            assertEquals("bbb",rs.getString(1));
+            assertEquals("bcd",rs.getString(2));
+            assertNotNull(rs.getDate(3));
+
+            assertTrue(rs.next());
+            assertEquals("ccc",rs.getString(1));
+            assertEquals("cde",rs.getString(2));
+            assertNotNull(rs.getDate(3));
+
+            assertFalse(rs.next());
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + tableName +
+                    " (id, ts) select id, CAST(null AS timestamp) from " + tableName +
+                    " where id <= 'bbb' limit 1");
+            conn.commit();
+        }
 
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery("select * from " + tableName);
+            assertTrue(rs.next());
+            assertEquals("aaa",rs.getString(1));
+            assertEquals("abc",rs.getString(2));
+            assertNull(rs.getDate(3));
+
+            assertTrue(rs.next());
+            assertEquals("bbb",rs.getString(1));
+            assertEquals("bcd",rs.getString(2));
+            assertNotNull(rs.getDate(3));
+
+            assertTrue(rs.next());
+            assertEquals("ccc",rs.getString(1));
+            assertEquals("cde",rs.getString(2));
+            assertNotNull(rs.getDate(3));
+
+            assertFalse(rs.next());
+        }
     }
     
     @Test
@@ -735,42 +823,51 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
         String t1 = generateUniqueName();
         String t2 = generateUniqueName();
         String seq = generateUniqueName();
-        conn.createStatement().execute("create table  " + t1 + " (id bigint not null primary key, v varchar)");
-        conn.createStatement().execute("create table " + t2 + " (k varchar primary key)");
-        conn.createStatement().execute("create sequence " + seq);
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + t2 + " values ('a')");
-        conn.createStatement().execute("upsert into " + t2 + " values ('b')");
-        conn.createStatement().execute("upsert into " + t2 + " values ('c')");
-        conn.commit();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + t1 + " select next value for  " + seq + " , k from " + t2);
-        conn.commit();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        ResultSet rs = conn.createStatement().executeQuery("select * from " + t1);
-        
-        assertTrue(rs.next());
-        assertEquals(1,rs.getLong(1));
-        assertEquals("a",rs.getString(2));
-        
-        assertTrue(rs.next());
-        assertEquals(2,rs.getLong(1));
-        assertEquals("b",rs.getString(2));
-        
-        assertTrue(rs.next());
-        assertEquals(3,rs.getLong(1));
-        assertEquals("c",rs.getString(2));
-        
-        assertFalse(rs.next());
-        conn.close();
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("create table  " + t1 +
+                    " (id bigint not null primary key, v varchar)");
+            stmt.execute("create table " + t2 + " (k varchar primary key)");
+            stmt.execute("create sequence " + seq);
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t2 + " values ('a')");
+            stmt.execute("upsert into " + t2 + " values ('b')");
+            stmt.execute("upsert into " + t2 + " values ('c')");
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t1 + " select next value for  " +
+                    seq + " , k from " + t2);
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery("select * from " + t1);
+
+            assertTrue(rs.next());
+            assertEquals(1,rs.getLong(1));
+            assertEquals("a",rs.getString(2));
+
+            assertTrue(rs.next());
+            assertEquals(2,rs.getLong(1));
+            assertEquals("b",rs.getString(2));
+
+            assertTrue(rs.next());
+            assertEquals(3,rs.getLong(1));
+            assertEquals("c",rs.getString(2));
+
+            assertFalse(rs.next());
+        }
     }
     
     @Test
@@ -779,53 +876,61 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
         String t1 = generateUniqueName();
         String t2 = generateUniqueName();
-        String ddl = "CREATE TABLE IF NOT EXISTS " + t1 +  "("
-                + "ORGANIZATION_ID CHAR(15) NOT NULL, QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL, K1 INTEGER, V1 INTEGER "
-                + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, CURSOR_ORDER) " + ") SALT_BUCKETS = 4";
-        conn.createStatement().execute(ddl);
-        conn.createStatement().execute(
-                "CREATE TABLE " + t2
-                        + "(ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer NOT NULL, v1 integer NOT NULL "
-                        + "CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS = 4");
-        conn.createStatement().execute("create sequence s cache " + Integer.MAX_VALUE);
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        for (int i = 0; i < numOfRecords; i++) {
-            conn.createStatement().execute(
-                    "UPSERT INTO " + t2 + " values ('00Dxx0000001gEH'," + i + "," + (i + 2) + ")");
-        }
-        conn.commit();
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        conn.createStatement().execute(
-                    "UPSERT INTO " + t1 + " SELECT '00Dxx0000001gEH', 'MyQueryId', NEXT VALUE FOR S, k1, v1  FROM " + t2 + " ORDER BY K1, V1");
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + t1);
-
-        assertTrue(rs.next());
-        assertEquals(numOfRecords, rs.getLong(1));
-
-        ResultSet rs2 = conn.createStatement().executeQuery(
-                "select cursor_order, k1, v1 from " + t1 + " order by cursor_order");
-        long seq = 1;
-        while (rs2.next()) {
-            assertEquals(seq, rs2.getLong("cursor_order"));
-            // This value should be the sequence - 1 as we said order by k1 in the UPSERT...SELECT, but is not because
-            // of sequence processing.
-            assertEquals(seq - 1, rs2.getLong("k1"));
-            seq++;
-        }
-        // cleanup afrer ourselves
-        conn.createStatement().execute("drop sequence s");
-        conn.close();
+        String ddl = "CREATE TABLE IF NOT EXISTS " + t1 +  "(ORGANIZATION_ID CHAR(15) NOT NULL, " +
+                "QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL, K1 INTEGER, " +
+                "V1 INTEGER " + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, " +
+                "CURSOR_ORDER) " + ") SALT_BUCKETS = 4";
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(ddl);
+            stmt.execute(
+                    "CREATE TABLE " + t2 + "(ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer " +
+                            "NOT NULL, v1 integer NOT NULL CONSTRAINT PK PRIMARY KEY " +
+                            "(ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS = 4");
+            stmt.execute("create sequence s cache " + Integer.MAX_VALUE);
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            for (int i = 0; i < numOfRecords; i++) {
+                stmt.execute("UPSERT INTO " + t2 +
+                        " values ('00Dxx0000001gEH'," + i + "," + (i + 2) + ")");
+            }
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(true);
+            stmt.execute("UPSERT INTO " + t1 +
+                    " SELECT '00Dxx0000001gEH', 'MyQueryId', NEXT VALUE FOR S, k1, v1  FROM " +
+                    t2 + " ORDER BY K1, V1");
 
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            ResultSet rs = stmt.executeQuery("select count(*) from " + t1);
+
+            assertTrue(rs.next());
+            assertEquals(numOfRecords, rs.getLong(1));
+
+            ResultSet rs2 = stmt.executeQuery("select cursor_order, k1, v1 from " + t1 +
+                    " order by cursor_order");
+            long seq = 1;
+            while (rs2.next()) {
+                assertEquals(seq, rs2.getLong("cursor_order"));
+                // This value should be the sequence - 1 as we said order by k1 in the
+                // UPSERT...SELECT, but is not because of sequence processing.
+                assertEquals(seq - 1, rs2.getLong("k1"));
+                seq++;
+            }
+            // cleanup afrer ourselves
+            stmt.execute("drop sequence s");
+        }
     }
     
     @Test
@@ -833,10 +938,17 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         String t1 = generateUniqueName();
         String t2 = generateUniqueName();
         String t3 = generateUniqueName();
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + t1 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
-            conn.createStatement().execute("CREATE TABLE " + t2 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
-            conn.createStatement().execute("CREATE TABLE " + t3 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + t1 +
+                    " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+                    "PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+            stmt.execute("CREATE TABLE " + t2 +
+                    " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+                    "PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+            stmt.execute("CREATE TABLE " + t3 + " (PK1 VARCHAR NOT NULL, " +
+                    "PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+                    "PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
         }
         
         // The timestamp of the put will be the value of the row_timestamp column.
@@ -845,8 +957,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + t1 + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + t1 +
+                " (PK1, PK2, KV1) VALUES(?, ?, ?)")) {
             stmt.setString(1, "PK1");
             stmt.setDate(2, rowTimestampDate);
             stmt.setString(3, "KV1");
@@ -854,53 +967,66 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             conn.commit();
         }
         
-        // Upsert select data into table T2. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
-        // it won't see the data from table T1.
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().executeUpdate("UPSERT INTO " + t2 + " SELECT * FROM " + t1);
+        // Upsert select data into table T2. The connection needs to be at a timestamp beyond the
+        // row timestamp. Otherwise it won't see the data from table T1.
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate("UPSERT INTO " + t2 + " SELECT * FROM " + t1);
             conn.commit();
-            // Verify the data upserted in T2. Note that we can use the same connection here because the data was
-            // inserted with a timestamp of rowTimestamp and the connection uses the latest timestamp
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 + " WHERE PK1 = ? AND PK2 = ?");
-            stmt.setString(1, "PK1");
-            stmt.setDate(2, rowTimestampDate);
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("PK1", rs.getString("PK1"));
-            assertEquals(rowTimestampDate, rs.getDate("PK2"));
-            assertEquals("KV1", rs.getString("KV1"));
+            // Verify the data upserted in T2. Note that we can use the same connection here because
+            // the data was inserted with a timestamp of rowTimestamp and the connection uses
+            // the latest timestamp
+            try (PreparedStatement prepStmt = conn.prepareStatement("SELECT * FROM " + t2 +
+                    " WHERE PK1 = ? AND PK2 = ?")) {
+                prepStmt.setString(1, "PK1");
+                prepStmt.setDate(2, rowTimestampDate);
+                ResultSet rs = prepStmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals("PK1", rs.getString("PK1"));
+                assertEquals(rowTimestampDate, rs.getDate("PK2"));
+                assertEquals("KV1", rs.getString("KV1"));
+            }
+
         }
         
-        // Verify that you can't see the data in T2 if the connection is at a timestamp  lower than the row timestamp.
+        // Verify that you can't see the data in T2 if the connection is at a timestamp
+        // lower than the row timestamp.
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(rowTimestamp-1));
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 + " WHERE PK1 = ? AND PK2 = ?");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 +
+                " WHERE PK1 = ? AND PK2 = ?")) {
             stmt.setString(1, "PK1");
             stmt.setDate(2, rowTimestampDate);
             ResultSet rs = stmt.executeQuery();
             assertFalse(rs.next());
         }
         
-        // Upsert select data into table T3. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
-        // it won't see the data from table T1.
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().executeUpdate("UPSERT INTO " + t3 + " SELECT * FROM " + t1);
+        // Upsert select data into table T3. The connection needs to be at a timestamp beyond the
+        // row timestamp. Otherwise it won't see the data from table T1.
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate("UPSERT INTO " + t3 + " SELECT * FROM " + t1);
             conn.commit();
-            // Verify the data upserted in T3. Note that we can use the same connection here because the data was
-            // inserted with a timestamp of rowTimestamp and the connection  uses the latest timestamp
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 + " WHERE PK1 = ? AND PK2 = ?");
-            stmt.setString(1, "PK1");
-            stmt.setDate(2, rowTimestampDate);
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals("PK1", rs.getString("PK1"));
-            assertEquals(rowTimestampDate, rs.getDate("PK2"));
-            assertEquals("KV1", rs.getString("KV1"));
+            // Verify the data upserted in T3. Note that we can use the same connection here
+            // because the data was inserted with a timestamp of rowTimestamp and the connection
+            // uses the latest timestamp
+            try (PreparedStatement prepStmt = conn.prepareStatement("SELECT * FROM " + t3 +
+                    " WHERE PK1 = ? AND PK2 = ?")) {
+                prepStmt.setString(1, "PK1");
+                prepStmt.setDate(2, rowTimestampDate);
+                ResultSet rs = prepStmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals("PK1", rs.getString("PK1"));
+                assertEquals(rowTimestampDate, rs.getDate("PK2"));
+                assertEquals("KV1", rs.getString("KV1"));
+            }
         }
         
-        // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 + " WHERE PK1 = ? AND PK2 = ?");
+        // Verify that you can't see the data in T2 if the connection is at next timestamp
+        // (which is lower than the row timestamp).
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 +
+                " WHERE PK1 = ? AND PK2 = ?")) {
             stmt.setString(1, "PK1");
             stmt.setDate(2, rowTimestampDate);
             ResultSet rs = stmt.executeQuery();
@@ -914,15 +1040,19 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + tableName +
+                    " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+                    "PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
         }
 
         // The timestamp of the put will be the value of the row_timestamp column.
         long rowTimestamp = 100;
         Date rowTimestampDate = new Date(rowTimestamp);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName +
+                " (PK1, PK2, KV1) VALUES(?, ?, ?)")) {
             stmt.setInt(1, 1);
             stmt.setDate(2, rowTimestampDate);
             stmt.setString(3, "KV1");
@@ -930,21 +1060,26 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             conn.commit();
         }
         String seq = generateUniqueName();
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE SEQUENCE " + seq);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE SEQUENCE " + seq);
         }
-        // Upsert select data into table. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
-        // it won't see the data from table.
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + "  SELECT NEXT VALUE FOR " + seq + ", PK2 FROM  " + tableName);
+        // Upsert select data into table. The connection needs to be at a timestamp beyond the
+        // row timestamp. Otherwise it won't see the data from table.
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate("UPSERT INTO  " + tableName +
+                    "  SELECT NEXT VALUE FOR " + seq + ", PK2 FROM  " + tableName);
             conn.commit();
         }
         
         // Upsert select using sequences.
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
             conn.setAutoCommit(true);
             for (int i = 0; i < 10; i++) {
-                int count = conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + "  SELECT NEXT VALUE FOR " + seq + ", PK2 FROM  " + tableName);
+                int count = stmt.executeUpdate("UPSERT INTO  " + tableName +
+                        "  SELECT NEXT VALUE FOR " + seq + ", PK2 FROM  " + tableName);
                 assertEquals((int)Math.pow(2, i), count);
             }
         }
@@ -955,19 +1090,27 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         String table1 = generateUniqueName();
         String table2 = generateUniqueName();
         String table3 = generateUniqueName();
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + table1 + " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
-            conn.createStatement().execute("CREATE TABLE " + table2 + " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
-            conn.createStatement().execute("CREATE TABLE " + table3 + " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
+        try (Connection conn = DriverManager.getConnection(getUrl());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + table1 +
+                    " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR " +
+                    "CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
+            stmt.execute("CREATE TABLE " + table2 +
+                    " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR" +
+                    " CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
+            stmt.execute("CREATE TABLE " + table3 +
+                    " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR" +
+                    " CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
         }
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Upsert values where row_timestamp column PK2 is not set and the column names are specified
-            // This should upsert data with the value for PK2 as server timestamp
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table1 + " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table1 +
+                " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)")) {
+            // Upsert values where row_timestamp column PK2 is not set and the column names
+            // are specified. This should upsert data with the value for PK2 as server timestamp
             stmt.setString(1, "PK1");
             stmt.setString(2, "KV1");
             stmt.setString(3, "KV2");
@@ -976,10 +1119,11 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         }
         long endTime = EnvironmentEdgeManager.currentTimeMillis();
         
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
-            // the data in this query.
-            PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " + table1 + " WHERE T1PK1 = ? AND T1PK2 >= ? AND T1PK2 <= ?");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " +
+                table1 + " WHERE T1PK1 = ? AND T1PK2 >= ? AND T1PK2 <= ?")) {
+            // Now query for data that was upserted above. If the row key was generated correctly
+            // then we should be able to see the data in this query.
             stmt.setString(1, "PK1");
             stmt.setDate(2, new Date(startTime));
             stmt.setDate(3, new Date(endTime));
@@ -991,18 +1135,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         }
         
         startTime = EnvironmentEdgeManager.currentTimeMillis();
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table2 + " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table2 +
+                " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1)) {
+            // Upsert select into table2 by not selecting the row timestamp column. In this case,
+            // the rowtimestamp column would end up being set to the server timestamp
             stmt.executeUpdate();
             conn.commit();
         }
         endTime = EnvironmentEdgeManager.currentTimeMillis();
         
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
-            // the data in this query.
-            PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " + table2 + " WHERE T2PK1 = ? AND T2PK2 >= ?  AND T2PK2 <= ?");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " +
+                table2 + " WHERE T2PK1 = ? AND T2PK2 >= ?  AND T2PK2 <= ?")) {
+            // Now query for data that was upserted above. If the row key was generated correctly
+            // then we should be able to see the data in this query.
             stmt.setString(1, "PK1");
             stmt.setDate(2, new Date(startTime));
             stmt.setDate(3, new Date(endTime));
@@ -1014,18 +1161,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         }
         
         startTime = EnvironmentEdgeManager.currentTimeMillis();
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Upsert select into table3 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table3 + " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table3 +
+                " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2)) {
+            // Upsert select into table3 by not selecting the row timestamp column. In this case,
+            // the rowtimestamp column would end up being set to the server timestamp
             stmt.executeUpdate();
             conn.commit();
         }
         endTime = EnvironmentEdgeManager.currentTimeMillis();
         
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
-            // the data in this query.
-            PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " + table3 + " WHERE T3PK1 = ? AND T3PK2 >= ? AND T3PK2 <= ?");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " +
+                table3 + " WHERE T3PK1 = ? AND T3PK2 >= ? AND T3PK2 <= ?")) {
+            // Now query for data that was upserted above. If the row key was generated correctly
+            // then we should be able to see the data in this query.
             stmt.setString(1, "PK1");
             stmt.setDate(2, new Date(startTime));
             stmt.setDate(3, new Date(endTime));
@@ -1044,9 +1194,14 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + tableName1 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
-            conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + tableName1 +
+                    " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR" +
+                    " CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
+            stmt.execute("CREATE TABLE " + tableName2 +
+                    " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR" +
+                    " CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
         }
 
         String[] tableNames = {tableName1, tableName2};
@@ -1054,8 +1209,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             // Upsert data with the row timestamp value set
             long rowTimestamp1 = 100;
             Date rowTimestampDate = new Date(rowTimestamp1);
-            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)");
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " +
+                            tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)")) {
                 stmt.setInt(1, 1);
                 stmt.setDate(2, rowTimestampDate);
                 stmt.setInt(3, 3);
@@ -1065,18 +1221,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             }
 
             long startTime = EnvironmentEdgeManager.currentTimeMillis();
-            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    Statement stmt = conn.createStatement()) {
                 conn.setAutoCommit(true);
                 // Upsert select in the same table with the row_timestamp column PK2 not specified. 
                 // This will end up creating a new row whose timestamp is the server time stamp 
                 // which is also used for the row key
-                conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM  " + tableName);
+                stmt.executeUpdate("UPSERT INTO  " + tableName +
+                        " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM  " + tableName);
             }
             long endTime = EnvironmentEdgeManager.currentTimeMillis();
             
-            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + tableName +
+                    " WHERE PK1 = ? AND PK2 >= ? AND PK2<= ? AND PK3 = ?")) {
                 // Verify the row that was upserted above
-                PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + tableName + " WHERE PK1 = ? AND PK2 >= ? AND PK2<= ? AND PK3 = ?");
                 stmt.setInt(1, 1);
                 stmt.setDate(2, new Date(startTime));
                 stmt.setDate(3, new Date(endTime));
@@ -1088,20 +1247,27 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
                 assertEquals("KV1", rs.getString("KV1"));
                 assertFalse(rs.next());
                 // Number of rows in the table should be 2.
-                rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
-                assertTrue(rs.next());
-                assertEquals(2, rs.getInt(1));
+                try (Statement newStmt = conn.createStatement()) {
+                    rs = newStmt.executeQuery("SELECT COUNT(*) FROM " + tableName);
+                    assertTrue(rs.next());
+                    assertEquals(2, rs.getInt(1));
+                }
 
             }
-            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    Statement stmt = conn.createStatement()) {
                 conn.setAutoCommit(true);
-                // Upsert select in the same table with the row_timestamp column PK2 specified. This will not end up creating a new row
-                // because the destination pk columns, including the row timestamp column PK2, are the same as the source column.
-                conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM  " + tableName);
+                // Upsert select in the same table with the row_timestamp column PK2 specified.
+                // This will not end up creating a new row because the destination pk columns,
+                // including the row timestamp column PK2, are the same as the source column.
+                stmt.executeUpdate("UPSERT INTO  " + tableName +
+                        " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM  " + tableName);
             }
-            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                // Verify that two rows were created. One with rowtimestamp1 and the other with rowtimestamp2
-                ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+            try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                    Statement stmt = conn.createStatement()) {
+                // Verify that two rows were created. One with rowtimestamp1 and the other
+                // with rowtimestamp2
+                ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + tableName);
                 assertTrue(rs.next());
                 assertEquals(2, rs.getInt(1));
                 assertFalse(rs.next());
@@ -1109,7 +1275,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             
         }
     }
-    
+
     @Test
     public void testRowTimestampColWithViewsIndexesAndSaltedTables() throws Exception {
         String baseTable = generateUniqueName();
@@ -1121,25 +1287,35 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE IMMUTABLE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
-            conn.createStatement().execute("CREATE INDEX " + baseTableIdx + " ON " + baseTable + " (PK2, KV3) INCLUDE (KV1)");
-            conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable + " WHERE KV1 = 'KV1'");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE IMMUTABLE TABLE " + baseTable +
+                    " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, " +
+                    "KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, " +
+                    "PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
+            stmt.execute("CREATE INDEX " + baseTableIdx + " ON " +
+                    baseTable + " (PK2, KV3) INCLUDE (KV1)");
+            stmt.execute("CREATE VIEW " + globalView + " AS SELECT * FROM " +
+                    baseTable + " WHERE KV1 = 'KV1'");
         }
         
         String tenantId = "tenant1";
-        try (Connection conn = getTenantConnection(tenantId)) {
-            conn.createStatement().execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " + baseTable);
-            conn.createStatement().execute("CREATE INDEX " + tenantViewIdx + " ON " + tenantView + " (PK2, KV2) INCLUDE (KV1)");
+        try (Connection conn = getTenantConnection(tenantId);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " +
+                    baseTable);
+            stmt.execute("CREATE INDEX " + tenantViewIdx + " ON " +
+                    tenantView + " (PK2, KV2) INCLUDE (KV1)");
         }
 
         // upsert data into base table without specifying the row timestamp column PK2
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
-            // creating a new row whose timestamp is the latest timestamp (which will be used
-            // for the row key too)
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + baseTable + " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + baseTable +
+                " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)")) {
+            // Upsert select in the same table with the row_timestamp column PK2 not specified.
+            // This will end up creating a new row whose timestamp is the latest timestamp
+            // (which will be used for the row key too)
             stmt.setString(1, tenantId);
             stmt.setInt(2, 3);
             stmt.setString(3, "KV1");
@@ -1150,52 +1326,62 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         }
         long endTime = EnvironmentEdgeManager.currentTimeMillis();
 
-        // Verify that we can see data when querying through base table, global view and index on the base table
+        // Verify that we can see data when querying through base table, global view and index on
+        // the base table
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             // Query the base table
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?");
-            stmt.setString(1, tenantId);
-            stmt.setDate(2, new Date(startTime));
-            stmt.setDate(3, new Date(endTime));
-            stmt.setInt(4, 3);
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(tenantId, rs.getString("TENANT_ID"));
-            assertEquals("KV1", rs.getString("KV1"));
-            assertEquals("KV2", rs.getString("KV2"));
-            assertEquals("KV3", rs.getString("KV3"));
-            assertFalse(rs.next());
+            try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable +
+                    " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
+                stmt.setString(1, tenantId);
+                stmt.setDate(2, new Date(startTime));
+                stmt.setDate(3, new Date(endTime));
+                stmt.setInt(4, 3);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(tenantId, rs.getString("TENANT_ID"));
+                assertEquals("KV1", rs.getString("KV1"));
+                assertEquals("KV2", rs.getString("KV2"));
+                assertEquals("KV3", rs.getString("KV3"));
+                assertFalse(rs.next());
+            }
 
             // Query the globalView
-            stmt = conn.prepareStatement("SELECT /*+ NO_INDEX */ * FROM  " + globalView + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?");
-            stmt.setString(1, tenantId);
-            stmt.setDate(2, new Date(startTime));
-            stmt.setDate(3, new Date(endTime));
-            stmt.setInt(4, 3);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(tenantId, rs.getString("TENANT_ID"));
-            assertEquals("KV1", rs.getString("KV1"));
-            assertEquals("KV2", rs.getString("KV2"));
-            assertEquals("KV3", rs.getString("KV3"));
-            assertFalse(rs.next());
+            try (PreparedStatement stmt = conn.prepareStatement(
+                    "SELECT /*+ NO_INDEX */ * FROM  " + globalView +
+                            " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
+                stmt.setString(1, tenantId);
+                stmt.setDate(2, new Date(startTime));
+                stmt.setDate(3, new Date(endTime));
+                stmt.setInt(4, 3);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(tenantId, rs.getString("TENANT_ID"));
+                assertEquals("KV1", rs.getString("KV1"));
+                assertEquals("KV2", rs.getString("KV2"));
+                assertEquals("KV3", rs.getString("KV3"));
+                assertFalse(rs.next());
+            }
+
             // Query using the index on base table
-            stmt = conn.prepareStatement("SELECT KV1 FROM  " + baseTable + " WHERE PK2 >= ? AND PK2 <= ? AND KV3 = ?");
-            stmt.setDate(1, new Date(startTime));
-            stmt.setDate(2, new Date(endTime));
-            stmt.setString(3, "KV3");
-            rs = stmt.executeQuery();
-            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
-            assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
-            assertTrue(rs.next());
-            assertEquals("KV1", rs.getString("KV1"));
-            assertFalse(rs.next());
+            try (PreparedStatement stmt = conn.prepareStatement("SELECT KV1 FROM  " +
+                    baseTable + " WHERE PK2 >= ? AND PK2 <= ? AND KV3 = ?")) {
+                stmt.setDate(1, new Date(startTime));
+                stmt.setDate(2, new Date(endTime));
+                stmt.setString(3, "KV3");
+                ResultSet rs = stmt.executeQuery();
+                QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+                assertEquals(plan.getTableRef().getTable().getName().getString(), baseTableIdx);
+                assertTrue(rs.next());
+                assertEquals("KV1", rs.getString("KV1"));
+                assertFalse(rs.next());
+            }
         }
 
         // Verify that data can be queried using tenant view and tenant view index
-        try (Connection tenantConn = getTenantConnection(tenantId)) {
+        try (Connection tenantConn = getTenantConnection(tenantId);
+                PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " +
+                tenantView + " WHERE PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
             // Query the tenant view
-            PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " + tenantView + " WHERE PK2 >= ? AND PK2 <= ? AND PK3 = ?");
             stmt.setDate(1, new Date(startTime));
             stmt.setDate(2, new Date(endTime));
             stmt.setInt(3, 3);
@@ -1208,7 +1394,8 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
 
             // Query using the index on the tenantView
             //TODO: uncomment the code after PHOENIX-2277 is fixed
-//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView + " WHERE PK2 = ? AND KV2 = ?");
+//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView +
+//                    " WHERE PK2 = ? AND KV2 = ?");
 //            stmt.setDate(1, new Date(upsertedTs));
 //            stmt.setString(2, "KV2");
 //            rs = stmt.executeQuery();
@@ -1223,74 +1410,88 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         try (Connection tenantConn = getTenantConnection(tenantId)) {
             // Upsert into tenant view where the row_timestamp column PK2 is not specified
             startTime = EnvironmentEdgeManager.currentTimeMillis();
-            PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO  " + tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)");
-            stmt.setInt(1, 33);
-            stmt.setString(2, "KV13");
-            stmt.setString(3, "KV23");
-            stmt.setString(4, "KV33");
-            stmt.executeUpdate();
+            try (PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO  " +
+                    tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)")) {
+                stmt.setInt(1, 33);
+                stmt.setString(2, "KV13");
+                stmt.setString(3, "KV23");
+                stmt.setString(4, "KV33");
+                stmt.executeUpdate();
+            }
             tenantConn.commit();
             upsertedTs = endTime = EnvironmentEdgeManager.currentTimeMillis();
+
             // Upsert into tenant view where the row_timestamp column PK2 is specified
-            stmt = tenantConn.prepareStatement("UPSERT INTO  " + tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
-            stmt.setDate(1, new Date(upsertedTs));
-            stmt.setInt(2, 44);
-            stmt.setString(3, "KV14");
-            stmt.setString(4, "KV24");
-            stmt.setString(5, "KV34");
-            stmt.executeUpdate();
+            try (PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO  " +
+                    tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)")) {
+                stmt.setDate(1, new Date(upsertedTs));
+                stmt.setInt(2, 44);
+                stmt.setString(3, "KV14");
+                stmt.setString(4, "KV24");
+                stmt.setString(5, "KV34");
+                stmt.executeUpdate();
+            }
             tenantConn.commit();
         }
 
-        // Verify that the data upserted using the tenant view can now be queried using base table and the base table index
+        // Verify that the data upserted using the tenant view can now be queried using base table
+        // and the base table index
         Date upsertedDate;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             // Query the base table
-            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ? ");
-            stmt.setString(1, tenantId);
-            stmt.setDate(2, new Date(startTime));
-            stmt.setDate(3, new Date(endTime));
-            stmt.setInt(4, 33);
-            ResultSet rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(tenantId, rs.getString("TENANT_ID"));
-            assertEquals("KV13", rs.getString("KV1"));
-            assertEquals("KV23", rs.getString("KV2"));
-            assertEquals("KV33", rs.getString("KV3"));
-            upsertedDate = rs.getDate("PK2");
-            assertFalse(rs.next());
-            
-            stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
-            stmt.setString(1, tenantId);
-            stmt.setDate(2, new Date(upsertedTs));
-            stmt.setInt(3, 44);
-            rs = stmt.executeQuery();
-            assertTrue(rs.next());
-            assertEquals(tenantId, rs.getString("TENANT_ID"));
-            assertEquals("KV14", rs.getString("KV1"));
-            assertEquals("KV24", rs.getString("KV2"));
-            assertEquals("KV34", rs.getString("KV3"));
-            assertFalse(rs.next());
+            try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable +
+                    " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ? ")) {
+                stmt.setString(1, tenantId);
+                stmt.setDate(2, new Date(startTime));
+                stmt.setDate(3, new Date(endTime));
+                stmt.setInt(4, 33);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(tenantId, rs.getString("TENANT_ID"));
+                assertEquals("KV13", rs.getString("KV1"));
+                assertEquals("KV23", rs.getString("KV2"));
+                assertEquals("KV33", rs.getString("KV3"));
+                upsertedDate = rs.getDate("PK2");
+                assertFalse(rs.next());
+            }
+
+            try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable +
+                    " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ")) {
+                stmt.setString(1, tenantId);
+                stmt.setDate(2, new Date(upsertedTs));
+                stmt.setInt(3, 44);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(tenantId, rs.getString("TENANT_ID"));
+                assertEquals("KV14", rs.getString("KV1"));
+                assertEquals("KV24", rs.getString("KV2"));
+                assertEquals("KV34", rs.getString("KV3"));
+                assertFalse(rs.next());
+            }
+
             // Query using the index on base table
-            stmt = conn.prepareStatement("SELECT KV1 FROM  " + baseTable + " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1");
-            stmt.setDate(1, upsertedDate);
-            stmt.setString(2, "KV33");
-            stmt.setDate(3, new Date(upsertedTs));
-            stmt.setString(4, "KV34");
-            rs = stmt.executeQuery();
-            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
-            assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
-            assertTrue(rs.next());
-            assertEquals("KV13", rs.getString("KV1"));
-            assertTrue(rs.next());
-            assertEquals("KV14", rs.getString("KV1"));
-            assertFalse(rs.next());
+            try (PreparedStatement stmt = conn.prepareStatement("SELECT KV1 FROM  " + baseTable +
+                    " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1")) {
+                stmt.setDate(1, upsertedDate);
+                stmt.setString(2, "KV33");
+                stmt.setDate(3, new Date(upsertedTs));
+                stmt.setString(4, "KV34");
+                ResultSet rs = stmt.executeQuery();
+                QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+                assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+                assertTrue(rs.next());
+                assertEquals("KV13", rs.getString("KV1"));
+                assertTrue(rs.next());
+                assertEquals("KV14", rs.getString("KV1"));
+                assertFalse(rs.next());
+            }
         }
         
         // Verify that the data upserted using the tenant view can now be queried using tenant view
-        try (Connection tenantConn = getTenantConnection(tenantId)) {
+        try (Connection tenantConn = getTenantConnection(tenantId);
+                PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " +
+                tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1")) {
             // Query the base table
-            PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " + tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1");
             stmt.setDate(1, upsertedDate);
             stmt.setInt(2, 33);
             stmt.setDate(3, new Date(upsertedTs));
@@ -1304,7 +1505,8 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             
             //TODO: uncomment the code after PHOENIX-2277 is fixed
 //            // Query using the index on the tenantView
-//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView + " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
+//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView +
+//                    " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
 //            stmt.setDate(1, new Date(upsertedTs));
 //            stmt.setString(2, "KV23");
 //            stmt.setDate(3, new Date(upsertedTs));
@@ -1327,20 +1529,25 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
-            conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE " + tableName +
+                    " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+            stmt.execute("CREATE TABLE " + tableName2 +
+                    " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
         }
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            long upsertedTs = 100;
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +  " VALUES (?, ?)");
+        long upsertedTs = 100;
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +
+                " VALUES (?, ?)")) {
             stmt.setLong(1, upsertedTs);
             stmt.setString(2, "KV1");
             stmt.executeUpdate();
             conn.commit();
         }
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 +  " SELECT (PK1 - 500), KV1 FROM " + tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 +
+                " SELECT (PK1 - 500), KV1 FROM " + tableName)) {
             stmt.executeUpdate();
             fail();
         } catch (SQLException e) {
@@ -1353,49 +1560,64 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
         String t1 = generateUniqueName();
-        conn.createStatement().execute(
-                "create table " + t1 + " (id bigint not null primary key, ca char(3)[])");
-        conn.close();
+        ResultSet rs;
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + t1 + " values (1, ARRAY['aaa', 'bbb'])");
-        conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("create table " + t1 +
+                    " (id bigint not null primary key, ca char(3)[])");
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t1 + " values (1, ARRAY['aaa', 'bbb'])");
+            conn.commit();
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute(
-                "upsert into " + t1 + " (id, ca) select id, ARRAY['ccc', 'ddd'] from " + t1 + " WHERE id = 1");
-        conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t1 + " (id, ca) select id, " +
+                    "ARRAY['ccc', 'ddd'] from " + t1 + " WHERE id = 1");
+            conn.commit();
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        ResultSet rs = conn.createStatement().executeQuery("select * from " + t1);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery("select * from " + t1);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getLong(1));
+            assertEquals("['ccc', 'ddd']", rs.getArray(2).toString());
 
-        assertTrue(rs.next());
-        assertEquals(1, rs.getLong(1));
-        assertEquals("['ccc', 'ddd']", rs.getArray(2).toString());
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
         String t2 = generateUniqueName();
-        conn.createStatement().execute(
-                "create table " + t2 + " (id bigint not null primary key, ba binary(4)[])");
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("upsert into " + t2 + " values (2, ARRAY[1, 27])");
-        conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("create table " + t2 +
+                    " (id bigint not null primary key, ba binary(4)[])");
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute(
-                "upsert into " + t2 + " (id, ba) select id, ARRAY[54, 1024] from " + t2 + " WHERE id = 2");
-        conn.commit();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t2 + " values (2, ARRAY[1, 27])");
+            conn.commit();
+        }
 
-        conn = DriverManager.getConnection(getUrl(), props);
-        rs = conn.createStatement().executeQuery("select * from " + t2);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute("upsert into " + t2 + " (id, ba) select id, " +
+                    "ARRAY[54, 1024] from " + t2 + " WHERE id = 2");
+            conn.commit();
+        }
 
-        assertTrue(rs.next());
-        assertEquals(2, rs.getLong(1));
-        assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            rs = stmt.executeQuery("select * from " + t2);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getLong(1));
+            assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
+        }
     }
 
     @Test
@@ -1412,37 +1634,45 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
         String t1 = generateUniqueName();
-        conn.createStatement().execute(
-                "create table " + t1 + " (id bigint not null primary key, v varchar(20))");
-        conn.close();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
-        conn.createStatement().execute("upsert into " + t1 + " values (1, 'foo')");
-        conn.commit();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
-        conn.createStatement().execute(
-                "upsert into " + t1 + " (id, v) select id, '澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
-        conn.commit();
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
-        ResultSet rs = conn.createStatement().executeQuery("select * from  " + t1);
-
-        assertTrue(rs.next());
-        assertEquals(1, rs.getLong(1));
-        assertEquals("澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉", rs.getString(2));
-
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(autoCommit);
-        try {
-            conn.createStatement().execute(
-                    "upsert into  " + t1 + " (id, v) select id, '澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(autoCommit);
+            stmt.execute("create table " + t1 +
+                    " (id bigint not null primary key, v varchar(20))");
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(autoCommit);
+            stmt.execute("upsert into " + t1 + " values (1, 'foo')");
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(autoCommit);
+            stmt.execute("upsert into " + t1 + " (id, v) select id, "
+                    + "'澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(autoCommit);
+            ResultSet rs = stmt.executeQuery("select * from  " + t1);
+
+            assertTrue(rs.next());
+            assertEquals(1, rs.getLong(1));
+            assertEquals("澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉", rs.getString(2));
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(autoCommit);
+            stmt.execute("upsert into  " + t1 + " (id, v) select id, "
+                    + "'澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
             conn.commit();
             fail();
         } catch (SQLException e) {
@@ -1456,28 +1686,40 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
         props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
-        props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS, allowServerSideMutations);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
+        props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
+                allowServerSideMutations);
         String t1 = generateUniqueName();
         String t2 = generateUniqueName();
         String seq = generateUniqueName();
-        conn.createStatement().execute("CREATE SEQUENCE " + seq);
-        conn.createStatement().execute("CREATE TABLE  " + t1 + "  (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4");
-        conn.createStatement().execute("CREATE TABLE  " + t2 + "  (pk INTEGER PRIMARY KEY, val INTEGER)");
-        conn.close();
-        
-        conn = DriverManager.getConnection(getUrl(), props);
-        for (int i = 0; i < 100; i++) {
-            conn.createStatement().execute("UPSERT INTO  " + t1 + "  VALUES (NEXT VALUE FOR " + seq + ", " + (i%10) + ")");
-        }
-        conn.commit();
-        conn.close();
-        conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(true);
-        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + t2 + " SELECT pk, val FROM  " + t1);
-        assertEquals(100,upsertCount);
-        conn.close();
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            conn.setAutoCommit(false);
+
+            stmt.execute("CREATE SEQUENCE " + seq);
+            stmt.execute("CREATE TABLE  " + t1 +
+                    "  (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4");
+            stmt.execute("CREATE TABLE  " + t2 +
+                    "  (pk INTEGER PRIMARY KEY, val INTEGER)");
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
+            for (int i = 0; i < 100; i++) {
+                stmt.execute("UPSERT INTO  " + t1 +
+                        "  VALUES (NEXT VALUE FOR " + seq + ", " + (i%10) + ")");
+            }
+            conn.commit();
+        }
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            try (Statement stmt = conn.createStatement()) {
+                int upsertCount = stmt.executeUpdate("UPSERT INTO " + t2 +
+                        " SELECT pk, val FROM  " + t1);
+                assertEquals(100,upsertCount);
+            }
+        }
     }
 
     @Test // See https://issues.apache.org/jira/browse/PHOENIX-4265
@@ -1487,43 +1729,47 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            conn.createStatement().execute("CREATE IMMUTABLE TABLE " + tableName
-                    + " (k1 TIMESTAMP not null, k2 bigint not null, v bigint, constraint pk primary key (k1 row_timestamp, k2)) SALT_BUCKETS = 9");
-            conn.createStatement().execute(
-                "CREATE INDEX " + indexName + " ON " + tableName + " (v) INCLUDE (k2)");
-            PreparedStatement stmt =
-                    conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?, ?) ");
-            stmt.setTimestamp(1, new Timestamp(1000));
-            stmt.setLong(2, 2000);
-            stmt.setLong(3, 1000);
-            stmt.executeUpdate();
-            stmt.setTimestamp(1, new Timestamp(2000));
-            stmt.setLong(2, 5000);
-            stmt.setLong(3, 5);
-            stmt.executeUpdate();
-            stmt.setTimestamp(1, new Timestamp(3000));
-            stmt.setLong(2, 5000);
-            stmt.setLong(3, 5);
-            stmt.executeUpdate();
-            stmt.setTimestamp(1, new Timestamp(4000));
-            stmt.setLong(2, 5000);
-            stmt.setLong(3, 5);
-            stmt.executeUpdate();
-            stmt.setTimestamp(1, new Timestamp(5000));
-            stmt.setLong(2, 2000);
-            stmt.setLong(3, 10);
-            stmt.executeUpdate();
-            stmt.setTimestamp(1, new Timestamp(6000));
-            stmt.setLong(2, 2000);
-            stmt.setLong(3, 20);
-            stmt.executeUpdate();
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement statement = conn.createStatement()) {
+            statement.execute("CREATE IMMUTABLE TABLE " + tableName +
+                    " (k1 TIMESTAMP not null, k2 bigint not null, v bigint, constraint pk " +
+                    "primary key (k1 row_timestamp, k2)) SALT_BUCKETS = 9");
+            statement.execute("CREATE INDEX " + indexName + " ON " + tableName +
+                    " (v) INCLUDE (k2)");
+            try (PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +
+                    " VALUES (?, ?, ?) ")) {
+                stmt.setTimestamp(1, new Timestamp(1000));
+                stmt.setLong(2, 2000);
+                stmt.setLong(3, 1000);
+                stmt.executeUpdate();
+                stmt.setTimestamp(1, new Timestamp(2000));
+                stmt.setLong(2, 5000);
+                stmt.setLong(3, 5);
+                stmt.executeUpdate();
+                stmt.setTimestamp(1, new Timestamp(3000));
+                stmt.setLong(2, 5000);
+                stmt.setLong(3, 5);
+                stmt.executeUpdate();
+                stmt.setTimestamp(1, new Timestamp(4000));
+                stmt.setLong(2, 5000);
+                stmt.setLong(3, 5);
+                stmt.executeUpdate();
+                stmt.setTimestamp(1, new Timestamp(5000));
+                stmt.setLong(2, 2000);
+                stmt.setLong(3, 10);
+                stmt.executeUpdate();
+                stmt.setTimestamp(1, new Timestamp(6000));
+                stmt.setLong(2, 2000);
+                stmt.setLong(3, 20);
+                stmt.executeUpdate();
+            }
             conn.commit();
-            ResultSet rs = conn.createStatement().executeQuery("SELECT " +
+
+            ResultSet rs = statement.executeQuery("SELECT " +
                     " K2 FROM " + tableName + " WHERE V = 5");
             assertTrue("Index " + indexName + " should have been used",
-                rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
-                        .getTable().getName().getString().equals(indexName));
+                    rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
+                            .getTable().getName().getString().equals(indexName));
             assertTrue(rs.next());
             assertEquals(5000, rs.getLong("k2"));
             assertTrue(rs.next());
@@ -1531,12 +1777,12 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals(5000, rs.getLong("k2"));
             assertFalse(rs.next());
-            rs =
-                    conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName + " "
-                            + indexName + ") */ " + " K2 FROM " + tableName + " WHERE V = 5");
+
+            rs = statement.executeQuery("SELECT /*+ INDEX(" + tableName + " "
+                    + indexName + ") */ " + " K2 FROM " + tableName + " WHERE V = 5");
             assertTrue("Index " + indexName + " should have been used",
-                rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
-                        .getTable().getName().getString().equals(indexName));
+                    rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan()
+                            .getTableRef().getTable().getName().getString().equals(indexName));
             assertTrue(rs.next());
             assertEquals(5000, rs.getLong("k2"));
             assertTrue(rs.next());
@@ -1544,6 +1790,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             assertTrue(rs.next());
             assertEquals(5000, rs.getLong("k2"));
             assertFalse(rs.next());
+
         }
     }
 
@@ -1554,13 +1801,19 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         Properties props = new Properties();
         props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
             allowServerSideMutations);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Statement stmt = conn.createStatement()) {
             conn.setAutoCommit(true);
-            conn.createStatement().execute("create table " + tableName1 + "(name varchar(160) primary key, id varchar(120), address varchar(160))"); 
-            conn.createStatement().execute("create table " + tableName2 + "(name varchar(160) primary key, id varchar(10), address  varchar(10))");
-            conn.createStatement().execute("upsert into " + tableName1 + " values('test','test','test')");
-            conn.createStatement().execute("upsert into " + tableName2 + " select * from " + tableName1);
-            ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName2);
+            stmt.execute("create table " + tableName1 +
+                    "(name varchar(160) primary key, id varchar(120), address varchar(160))");
+            stmt.execute("create table " + tableName2 +
+                    "(name varchar(160) primary key, id varchar(10), address  varchar(10))");
+            stmt.execute("upsert into " + tableName1 +
+                    " values('test','test','test')");
+            stmt.execute("upsert into " + tableName2 + " select * from " +
+                    tableName1);
+            ResultSet rs = stmt.executeQuery("select * from " +
+                    tableName2);
             assertTrue(rs.next());
             assertEquals("test", rs.getString(1));
             assertEquals("test", rs.getString(2));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 0ce26cb..e7aae9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -36,11 +36,16 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Factory class used to instantiate an iterator to handle mutations made during a parallel scan.
  */
 public abstract class MutatingParallelIteratorFactory implements ParallelIteratorFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(
+            MutatingParallelIteratorFactory.class);
     protected final PhoenixConnection connection;
 
     protected MutatingParallelIteratorFactory(PhoenixConnection connection) {
@@ -50,62 +55,81 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
     /**
      * Method that does the actual mutation work
      */
-    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator,
+            PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
+    public PeekingResultIterator newIterator(final StatementContext parentContext,
+            ResultIterator iterator, Scan scan, String tableName,
+            QueryPlan plan) throws SQLException {
+
         final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
-        
-        MutationState state = mutate(parentContext, iterator, clonedConnection);
-        
-        final long totalRowCount = state.getUpdateCount();
-        final boolean autoFlush = connection.getAutoCommit() || plan.getTableRef().getTable().isTransactional();
-        if (autoFlush) {
-            clonedConnection.getMutationState().join(state);
-            state = clonedConnection.getMutationState();
-        }
-        final MutationState finalState = state;
-        
-        byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
-        Cell keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
-        final Tuple tuple = new SingleKeyValueTuple(keyValue);
-        return new PeekingResultIterator() {
-            private boolean done = false;
-            
-            @Override
-            public Tuple next() throws SQLException {
-                if (done) {
-                    return null;
-                }
-                done = true;
-                return tuple;
-            }
+        try {
+            MutationState state = mutate(parentContext, iterator, clonedConnection);
 
-            @Override
-            public void explain(List<String> planSteps) {
+            final long totalRowCount = state.getUpdateCount();
+            final boolean autoFlush = connection.getAutoCommit() ||
+                    plan.getTableRef().getTable().isTransactional();
+            if (autoFlush) {
+                clonedConnection.getMutationState().join(state);
+                state = clonedConnection.getMutationState();
             }
+            final MutationState finalState = state;
+
+            byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
+            Cell keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+                    SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+            final Tuple tuple = new SingleKeyValueTuple(keyValue);
+            return new PeekingResultIterator() {
+                private boolean done = false;
 
-            @Override
-            public void close() throws SQLException {
-                try {
-                    /* 
-                     * Join the child mutation states in close, since this is called in a single threaded manner
-                     * after the parallel results have been processed. 
-                     * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation 
-                     * state (with no mutations). However, it still has the metrics for mutation work done by the 
-                     * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
-                     * to the parent connection.
-                     */ 
-                    MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
-                } finally {
-                    clonedConnection.close();
+                @Override
+                public Tuple next() {
+                    if (done) {
+                        return null;
+                    }
+                    done = true;
+                    return tuple;
                 }
-            }
 
-            @Override
-            public Tuple peek() throws SQLException {
-                return done ? null : tuple;
+                @Override
+                public void explain(List<String> planSteps) {
+                }
+
+                @Override
+                public void close() throws SQLException {
+                    try {
+                        /*
+                         * Join the child mutation states in close, since this is called in a single
+                         * threaded manner after the parallel results have been processed.
+                         * If auto-commit is on for the cloned child connection, then the finalState
+                         * here is an empty mutation state (with no mutations). However, it still
+                         * has the metrics for mutation work done by the mutating-iterator.
+                         * Joining the mutation state makes sure those metrics are passed over
+                         * to the parent connection.
+                         */
+                        MutatingParallelIteratorFactory.this.connection.getMutationState()
+                                .join(finalState);
+                    } finally {
+                        clonedConnection.close();
+                    }
+                }
+
+                @Override
+                public Tuple peek() {
+                    return done ? null : tuple;
+                }
+            };
+        } catch (Throwable ex) {
+            // Catch just to make sure we close the cloned connection and then rethrow
+            try {
+                // closeQuietly only handles IOException
+                clonedConnection.close();
+            } catch (SQLException sqlEx) {
+                LOGGER.error("Closing cloned Phoenix connection inside iterator, failed with: ",
+                        sqlEx);
             }
-        };
+            throw ex;
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 97447f2..8e935b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -175,8 +175,10 @@ public class UpsertCompiler {
         mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
     }
     
-    public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
-            ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
+    public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef,
+            RowProjector projector, ResultIterator iterator, int[] columnIndexes,
+            int[] pkSlotIndexes, boolean useServerTimestamp,
+            boolean prefixSysColValues) throws SQLException {
         PhoenixStatement statement = childContext.getStatement();
         PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -233,22 +235,30 @@ public class UpsertCompiler {
                     Integer scale = rsScale == 0 ? null : rsScale;
                     // We are guaranteed that the two column will have compatible types,
                     // as we checked that before.
-                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), SortOrder.getDefault(), precision,
-                            scale, column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
-                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                            .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
-                            .buildException(); }
+                    if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+                            SortOrder.getDefault(), precision,
+                            scale, column.getMaxLength(), column.getScale())) {
+                        throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(
+                                    column.getName().getString())
+                            .setMessage("value=" + column.getDataType()
+                                    .toStringLiteral(ptr, null)).build()
+                            .buildException();
+                    }
                     column.getDataType().coerceBytes(ptr, value, column.getDataType(), 
                             precision, scale, SortOrder.getDefault(), 
                             column.getMaxLength(), column.getScale(), column.getSortOrder(),
                             table.rowKeyOrderOptimizable());
                     values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement,
+                        useServerTimestamp, indexMaintainer, viewConstants, null,
+                        numSplColumns);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (autoFlush && rowCount % batchSize == 0) {
-                    MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+                    MutationState state = new MutationState(tableRef, mutation, 0,
+                            maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
                     connection.getMutationState().send();
                     mutation.clear();
@@ -291,8 +301,8 @@ public class UpsertCompiler {
             StatementContext childContext = new StatementContext(statement, false);
             // Clone the row projector as it's not thread safe and would be used simultaneously by
             // multiple threads otherwise.
-            MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
-            return state;
+            return upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator,
+                    columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
         }
         
         public void setRowProjector(RowProjector projector) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c13f86b..fe7425d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -1468,7 +1468,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     throw new RuntimeException(e);
                 } catch (ExecutionException e) {
                     LOGGER.info("Failed to execute task during cancel", e);
-                    continue;
                 }
             }
         } finally {