You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2023/01/25 00:28:49 UTC

[phoenix] branch master updated: PHOENIX-6776 Abort scans of closed connections at ScanningResultIterator (#1551)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 320a266a4d PHOENIX-6776 Abort scans of closed connections at ScanningResultIterator (#1551)
320a266a4d is described below

commit 320a266a4d12715444967500fa6373fd53ad0762
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Tue Jan 24 16:28:41 2023 -0800

    PHOENIX-6776 Abort scans of closed connections at ScanningResultIterator (#1551)
---
 .../end2end/PreMatureTimelyAbortScanIt.java        | 89 ++++++++++++++++++++++
 .../coprocessor/BaseScannerRegionObserver.java     |  2 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |  5 +-
 .../phoenix/iterate/BaseResultIterators.java       |  3 +
 .../phoenix/iterate/ScanningResultIterator.java    | 29 ++++++-
 .../phoenix/iterate/TableResultIterator.java       | 25 ++++--
 .../iterate/TableSnapshotResultIterator.java       | 10 ++-
 .../org/apache/phoenix/jdbc/PhoenixConnection.java | 23 +++++-
 .../phoenix/mapreduce/PhoenixRecordReader.java     |  4 +-
 9 files changed, 171 insertions(+), 19 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java
new file mode 100644
index 0000000000..c0729707be
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.iterate.ScanningResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+@Category(ParallelStatsDisabledTest.class)
+public class PreMatureTimelyAbortScanIt extends ParallelStatsDisabledIT {
+    private static final Logger LOG = LoggerFactory.getLogger(PreMatureTimelyAbortScanIt.class);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60*60)); // An hour
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false));
+        props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    private String getUniqueUrl() {
+        return url + generateUniqueName();
+    }
+
+    @Test
+    public void testPreMatureScannerAbortForCount() throws Exception {
+
+        try (Connection conn = DriverManager.getConnection(getUniqueUrl())) {
+            conn.createStatement().execute("CREATE TABLE LONG_BUG (ID INTEGER PRIMARY KEY, AMOUNT DECIMAL) SALT_BUCKETS = 16 ");
+        }
+        try (Connection conn = DriverManager.getConnection(getUniqueUrl())) {
+            for (int i = 0; i<100 ; i++) {
+                int amount = -50000 + i;
+                String s = "UPSERT INTO LONG_BUG (ID, AMOUNT) VALUES( " + i + ", " + amount + ")";
+                conn.createStatement().execute(s);
+            }
+            conn.commit();
+        }
+
+        try {
+            PhoenixConnection conn = DriverManager.getConnection(getUniqueUrl()).unwrap(PhoenixConnection.class);
+            ScanningResultIterator.setIsScannerClosedForcefully(true);
+            ResultSet resultSet = conn.createStatement().executeQuery(
+                    "SELECT COUNT(*) FROM LONG_BUG WHERE ID % 2 = 0");
+            conn.setIsClosing(true);
+            resultSet.next();
+            LOG.info("Count of modulus 2 for LONG_BUG :- " + resultSet.getInt(1));
+            fail("ResultSet should have been closed");
+        } catch (SQLException sqe) {
+            assertEquals(SQLExceptionCode.FAILED_KNOWINGLY_FOR_TEST.getErrorCode(), sqe.getErrorCode());
+        } catch (Exception e) {
+            fail();
+        } finally {
+            ScanningResultIterator.setIsScannerClosedForcefully(false);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9f47169fc3..ab7119bfbc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -262,7 +262,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
             // last possible moment. You need to swap the start/stop and make the
             // start exclusive and the stop inclusive.
             ScanUtil.setupReverseScan(scan);
-            if (scan.getFilter() != null && !(scan.getFilter() instanceof PagedFilter)) {
+            if (!(scan.getFilter() instanceof PagedFilter)) {
                 byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
                 if (pageSizeMsBytes != null) {
                     scan.setFilter(new PagedFilter(scan.getFilter(), getPageSizeMsForFilter(scan)));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 63b7ba0904..771b06a42d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -583,7 +583,10 @@ public enum SQLExceptionCode {
 
     CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA(913, "43M24", "Cannot transform a table with append-only schema."),
 
-    CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a transactional table.");
+    CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a transactional table."),
+
+    //SQLCode for testing exceptions
+    FAILED_KNOWINGLY_FOR_TEST(7777, "TEST", "Exception was thrown to test something");
 
     private final int errorCode;
     private final String sqlState;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index cbbb5554ad..7dfcba334a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -1415,6 +1416,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             }
                             
                         }
+                    } catch (CancellationException ce) {
+                        LOGGER.warn("Iterator scheduled to be executed in Future was being cancelled", ce);
                     }
                 }
                 addIterator(iterators, concatIterators);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 8b30882325..35c5701ddb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -53,24 +53,38 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.phoenix.compile.ExplainPlanAttributes
     .ExplainPlanAttributesBuilder;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ScanningResultIterator implements ResultIterator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScanningResultIterator.class);
     private final ResultScanner scanner;
     private final ScanMetricsHolder scanMetricsHolder;
     boolean scanMetricsUpdated;
     boolean scanMetricsEnabled;
+    private StatementContext context;
+    private static boolean throwExceptionIfScannerClosedForceFully = false;
+
+    private final boolean isMapReduceContext;
 
-    public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder) {
+    public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext) {
         this.scanner = scanner;
         this.scanMetricsHolder = scanMetricsHolder;
+        this.context = context;
         scanMetricsUpdated = false;
         scanMetricsEnabled = scan.isScanMetricsEnabled();
+        this.isMapReduceContext = isMapReduceContext;
     }
 
     @Override
@@ -159,6 +173,14 @@ public class ScanningResultIterator implements ResultIterator {
         try {
             Result result = scanner.next();
             while (result != null && (result.isEmpty() || isDummy(result))) {
+                if (!isMapReduceContext && (context.getConnection().isClosing() || context.getConnection().isClosed())) {
+                    LOG.warn("Closing ResultScanner as Connection is already closed or in middle of closing");
+                    if (throwExceptionIfScannerClosedForceFully) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.FAILED_KNOWINGLY_FOR_TEST).build().buildException();
+                    }
+                    close();
+                    return null;
+                }
                 result = scanner.next();
             }
             if (result == null) {
@@ -190,4 +212,9 @@ public class ScanningResultIterator implements ResultIterator {
     public ResultScanner getScanner() {
         return scanner;
     }
+
+    @VisibleForTesting
+    public static void setIsScannerClosedForcefully(boolean throwException) {
+        throwExceptionIfScannerClosedForceFully = throwException;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9342ddc818..5e7e2cfe8f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,7 +26,6 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -37,7 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -47,7 +45,6 @@ import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ExplainPlanAttributes
     .ExplainPlanAttributesBuilder;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
@@ -55,10 +52,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -105,6 +99,8 @@ public class TableResultIterator implements ResultIterator {
     private Map<ImmutableBytesPtr,ServerCache> caches;
     private HashCacheClient hashCacheClient;
 
+    private final boolean isMapReduceContext;
+
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetricsHolder = null;
@@ -115,6 +111,7 @@ public class TableResultIterator implements ResultIterator {
         this.scanGrouper = null;
         this.caches = null;
         this.retry = 0;
+        this.isMapReduceContext = false;
     }
 
     public static enum RenewLeaseStatus {
@@ -123,11 +120,22 @@ public class TableResultIterator implements ResultIterator {
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
-        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null);
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false);
     }
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false);
+    }
+
+    public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext);
+    }
+
+    public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches,
+            boolean isMapReduceContext) throws SQLException {
         this.scan = scan;
         this.scanMetricsHolder = scanMetricsHolder;
         this.plan = plan;
@@ -140,6 +148,7 @@ public class TableResultIterator implements ResultIterator {
         this.caches = caches;
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
+        this.isMapReduceContext = isMapReduceContext;
         ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection());
     }
 
@@ -237,7 +246,7 @@ public class TableResultIterator implements ResultIterator {
             if (delegate == UNINITIALIZED_SCANNER) {
                 try {
                     this.scanIterator =
-                            new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder);
+                            new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext);
                 } catch (IOException e) {
                     Closeables.closeQuietly(htable);
                     throw ServerUtil.parseServerException(e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index e4e9bef910..20318eee77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -74,12 +75,16 @@ public class TableSnapshotResultIterator implements ResultIterator {
   private FileSystem fs;
   private int currentRegion;
   private boolean closed = false;
+  private StatementContext context;
 
-  public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder)
+  private final boolean isMapReduceContext;
+
+  public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext)
       throws IOException {
     this.configuration = configuration;
     this.currentRegion = -1;
     this.scan = scan;
+    this.context = context;
     this.scanMetricsHolder = scanMetricsHolder;
     this.scanIterator = UNINITIALIZED_SCANNER;
     if (PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
@@ -92,6 +97,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
         PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
     this.rootDir = CommonFSUtils.getRootDir(configuration);
     this.fs = rootDir.getFileSystem(configuration);
+    this.isMapReduceContext = isMapReduceContext;
     init();
   }
 
@@ -153,7 +159,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
         RegionInfo hri = regions.get(this.currentRegion);
         this.scanIterator =
             new ScanningResultIterator(new SnapshotScanner(configuration, fs, restoreDir, htd, hri, scan),
-                scan, scanMetricsHolder);
+                scan, scanMetricsHolder, context, isMapReduceContext);
       } catch (Throwable e) {
         throw ServerUtil.parseServerException(e);
       }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 79930371e9..50c037501c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -166,6 +166,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
     private int statementExecutionCounter;
     private TraceScope traceScope = null;
     private volatile boolean isClosed = false;
+    private volatile boolean isClosing = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
     private Consistency consistency = Consistency.STRONG;
@@ -700,7 +701,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
     }
 
     void checkOpen() throws SQLException {
-        if (isClosed) {
+        if (isClosed || isClosing) {
             throw reasonForClose != null
                 ? reasonForClose
                 : new SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED)
@@ -719,7 +720,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
      * @see #close()
      */
     public void close(SQLException reasonForClose) throws SQLException {
-        if (isClosed) {
+        if (isClosed || isClosing) {
             return;
         }
         this.reasonForClose = reasonForClose;
@@ -731,11 +732,12 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
     //Does this need to be synchronized?
     @Override
     synchronized public void close() throws SQLException {
-        if (isClosed) {
+        if (isClosed || isClosing) {
             return;
         }
 
         try {
+            isClosing = true;
             TableMetricsManager.pushMetricsFromConnInstanceMethod(getMutationMetrics());
             if(!(reasonForClose instanceof FailoverSQLException)) {
                 // If the reason for close is because of failover, the metrics will be kept for
@@ -757,6 +759,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
             }
 
         } finally {
+            isClosing = false;
             isClosed = true;
             if(isInternalConnection()){
                 GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
@@ -965,6 +968,10 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
         return isClosed;
     }
 
+    public boolean isClosing() throws SQLException {
+        return isClosing;
+    }
+
     @Override
     public boolean isReadOnly() throws SQLException {
         return readOnly;
@@ -973,7 +980,7 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
     @Override
     public boolean isValid(int timeout) throws SQLException {
         // TODO: run query here or ping
-        return !isClosed;
+        return !isClosed && !isClosing;
     }
 
     @Override
@@ -1362,6 +1369,14 @@ public class PhoenixConnection implements MetaDataMutated, SQLCloseable, Phoenix
         this.tableResultIteratorFactory = factory;
     }
 
+     /**
+     * Added for testing purposes. Do not use this elsewhere.
+     */
+    @VisibleForTesting
+    public void setIsClosing(boolean imitateIsClosing) {
+        isClosing = imitateIsClosing;
+    }
+
     @Override
     public void removeSchema(PSchema schema, long schemaTimeStamp) {
         getQueryServices().removeSchema(schema, schemaTimeStamp);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 91d3da40b4..970eb6f074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -138,7 +138,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                 if (snapshotName != null) {
                   // result iterator to read snapshots
                   final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,
-                      scanMetricsHolder);
+                      scanMetricsHolder, queryPlan.getContext(), true);
                     peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator);
                     LOGGER.info("Adding TableSnapshotResultIterator for scan: " + scan);
                 } else {
@@ -146,7 +146,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                       new TableResultIterator(
                           queryPlan.getContext().getConnection().getMutationState(), scan,
                           scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
-                          this.scanGrouper);
+                          this.scanGrouper, true);
                   peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                   LOGGER.info("Adding TableResultIterator for scan: " + scan);
                 }