You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2023/04/18 17:05:12 UTC

[phoenix] branch 5.1 updated: PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 6bcd8011a6 PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)
6bcd8011a6 is described below

commit 6bcd8011a6152a0b5b53c04a1e2ef380616ccc39
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Mon Apr 17 10:06:51 2023 -0700

    PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)
    
    Co-authored-by: Lokesh Khurana <lo...@salesforce.com>
---
 .../org/apache/phoenix/end2end/MapReduceIT.java    | 106 ++++++++++++++++++++-
 .../iterate/DelayedTableResultIteratorFactory.java |   9 +-
 .../phoenix/iterate/PhoenixQueryTimeoutIT.java     |  65 ++++++++++++-
 .../phoenix/iterate/BaseResultIterators.java       |  19 +++-
 .../phoenix/iterate/ChunkedResultIterator.java     |   3 +-
 .../iterate/DefaultTableResultIteratorFactory.java |   5 +-
 .../apache/phoenix/iterate/ParallelIterators.java  |   5 +-
 .../phoenix/iterate/ScanningResultIterator.java    |  12 ++-
 .../apache/phoenix/iterate/SerialIterators.java    |  10 +-
 .../phoenix/iterate/TableResultIterator.java       |  19 ++--
 .../iterate/TableResultIteratorFactory.java        |   3 +-
 .../iterate/TableSnapshotResultIterator.java       |   6 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java     |   8 +-
 13 files changed, 233 insertions(+), 37 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 62082624ec..21b25bf92d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
 import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -107,6 +108,34 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testMapReduceWithVerySmallPhoenixQueryTimeout() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, true);
+        }
+    }
+
+    @Test
+    public void testMapReduceWithVerySmallPhoenixQueryTimeoutWithTenantId() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, TENANT_ID, true);
+        }
+    }
+
+    @Test
+    public void testMapReduceWithNormalPhoenixQueryTimeout() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, false);
+        }
+    }
+
+    @Test
+    public void testMapReduceWithNormalPhoenixQueryTimeoutWithTenantId() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 81.04, TENANT_ID, false);
+        }
+    }
+
     @Test
     public void testWithTenantId() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())){
@@ -116,7 +145,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
 
     }
 
-    private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws
+    private void createAndTestJob(Connection conn, String whereCondition, double maxExpected, String tenantId) throws
             SQLException, IOException, InterruptedException, ClassNotFoundException {
         String stockTableName = generateUniqueName();
         String stockStatsTableName = generateUniqueName();
@@ -126,12 +155,79 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         final Configuration conf = getUtility().getConfiguration();
         Job job = Job.getInstance(conf);
         if (tenantId != null) {
-            setInputForTenant(job, tenantId, stockTableName, s);
+            setInputForTenant(job, tenantId, stockTableName, whereCondition);
+        } else {
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
+                    stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        }
+        testJob(conn, job, stockTableName, stockStatsTableName, maxExpected);
+
+    }
+
+    private void createPagedJobAndTestFailedJobDueToTimeOut(Connection conn, String whereCondition, double maxExpected, String tenantId,
+            boolean testVerySmallTimeOut) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+        String stockTableName = generateUniqueName();
+        String stockStatsTableName = generateUniqueName();
+        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+        conn.commit();
+        Configuration conf = new Configuration(getUtility().getConfiguration());
+        if (testVerySmallTimeOut) {
+            //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately,
+            //Need to set this at conf level as queryPlan generated at PhoenixInputFormat creates a new connection from
+            //JobContext's configuration.
+            conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+            conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(1));
+        } else {
+            conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+            conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(600000));
+        }
+
+        Job job = Job.getInstance(conf);
+        if (tenantId != null) {
+            setInputForTenant(job, tenantId, stockTableName, whereCondition);
         } else {
             PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
-                    stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+                    stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+        }
+
+        assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+                TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+        upsertData(conn, stockTableName);
+
+        // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+        job.getConfiguration().set("mapreduce.framework.name", "local");
+        setOutput(job, stockStatsTableName);
+
+        job.setMapperClass(StockMapper.class);
+        job.setReducerClass(StockReducer.class);
+        job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(DoubleWritable.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(StockWritable.class);
+
+        if (testVerySmallTimeOut) {
+            // run job and it should fail due to Timeout
+            assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true));
+        } else {
+            //run
+            assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));
+
+            //verify
+            ResultSet stats = DriverManager.getConnection(getUrl()).createStatement()
+                    .executeQuery("SELECT * FROM " + stockStatsTableName);
+            assertTrue("No data stored in stats table!", stats.next());
+            String name = stats.getString(1);
+            double max = stats.getDouble(2);
+            assertEquals("Got the wrong stock name!", "AAPL", name);
+            assertEquals("Max value didn't match the expected!", maxExpected, max, 0);
+            assertFalse("Should only have stored one row in stats table!", stats.next());
+            assertEquals("There should have been only be 1 call to getRegionBoundaries "
+                        + "(corresponding to the driver code)", 1,
+                TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
         }
-        testJob(conn, job, stockTableName, stockStatsTableName, v);
 
     }
 
@@ -199,8 +295,10 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
 
     private void upsertData(Connection conn, String stockTableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
+        upsertData(stmt, "AAPL", 2010, new Double[]{73.48, 82.25, 75.2, 82.89});
         upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
         upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+        upsertData(stmt, "AAPL", 2007, new Double[]{73.88, 80.24, 78.9, 66.3});
         conn.commit();
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 23bfebd449..74cb7f472e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -45,16 +45,17 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
     @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+            long maxQueryEndTime) throws SQLException {
         return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
-                renewLeaseThreshold, plan, scanGrouper, caches);
+                renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
     }
 
     private class DelayedTableResultIterator extends TableResultIterator {
         public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
                 ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
-                ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
-            super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches);
+                ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
+            super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
         }
         
         @Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index 45adcfbc68..46a62d9c53 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -29,8 +30,10 @@ import java.util.Properties;
 
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -49,14 +52,13 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
         tableName = generateUniqueName();
         int numRows = 1000;
         String ddl =
-            "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+            "CREATE TABLE " + tableName + " (K INTEGER NOT NULL PRIMARY KEY, V VARCHAR)";
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(ddl);
             String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
             PreparedStatement stmt = conn.prepareStatement(dml);
             for (int i = 1; i <= numRows; i++) {
-                String key = "key" + i;
-                stmt.setString(1, key);
+                stmt.setInt(1, i);
                 stmt.setString(2, "value" + i);
                 stmt.executeUpdate();
             }
@@ -99,10 +101,51 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
             }
             assertEquals("Unexpected number of records returned", 1000, count);
         } catch (Exception e) {
-            fail("Expected query to suceed");
+            fail("Expected query to succeed");
         }
     }
 
+    @Test
+    public void testScanningResultIteratorQueryTimeoutForPagingWithVeryLowTimeout() throws Exception {
+        //Arrange
+        PreparedStatement ps = loadDataAndPreparePagedQuery(1,1);
+
+        //Act + Assert
+        try {
+            //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it.
+            BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true);
+            ResultSet rs = ps.executeQuery();
+            while(rs.next()) {}
+            fail("Expected query to timeout with a 1 ms timeout");
+        } catch (SQLException e) {
+            //OPERATION_TIMED_OUT Exception expected
+            assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode());
+        } finally {
+            BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
+        }
+    }
+
+    @Test
+    public void testScanningResultIteratorQueryTimeoutForPagingWithNormalLowTimeout() throws Exception {
+        //Arrange
+        PreparedStatement ps = loadDataAndPreparePagedQuery(30000,30);
+
+        //Act + Assert
+        try {
+            //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it.
+            BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true);
+            ResultSet rs = ps.executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+            }
+            assertEquals("Unexpected number of records returned", 500, count);
+        } catch (SQLException e) {
+            fail("Expected query to succeed");
+        } finally {
+            BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
+        }
+    }
     
     //-----------------------------------------------------------------
     // Private Helper Methods
@@ -118,4 +161,18 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
         assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout());
         return ps;
     }
+
+    private PreparedStatement loadDataAndPreparePagedQuery(int timeoutMs, int timeoutSecs) throws Exception {
+        Properties props = new Properties();
+        //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately
+        props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, String.valueOf(timeoutMs));
+        props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+        PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+        PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE K % 2 = 0");
+        PhoenixStatement phoenixStmt = ps.unwrap(PhoenixStatement.class);
+        assertEquals(timeoutMs, phoenixStmt.getQueryTimeoutInMillis());
+        assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout());
+        assertEquals(0, conn.getQueryServices().getProps().getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1));
+        return ps;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 45ec1f09cd..71263c0a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -118,6 +118,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -169,6 +170,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private final boolean useStatsForParallelization;
     protected Map<ImmutableBytesPtr,ServerCache> caches;
     private final QueryPlan dataPlan;
+    private static boolean forTestingSetTimeoutToMaxToLetQueryPassHere = false;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -1348,7 +1350,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
-            submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
+            submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
@@ -1357,6 +1359,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next();
                     try {
                         long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
+                        if (forTestingSetTimeoutToMaxToLetQueryPassHere) {
+                            timeOutForScan = Long.MAX_VALUE;
+                        }
                         if (timeOutForScan < 0) {
                             throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
                                     ". Query couldn't be completed in the allotted time: "
@@ -1598,7 +1603,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException;
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper,
+            long maxQueryEndTime) throws SQLException;
     
     @Override
     public int size() {
@@ -1710,4 +1716,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return this.estimateInfoTimestamp;
     }
 
+    /**
+     * Used for specific test case to check if timeouts are working in ScanningResultIterator.
+     * @param setTimeoutToMax
+     */
+    @VisibleForTesting
+    public static void setForTestingSetTimeoutToMaxToLetQueryPassHere(boolean setTimeoutToMax) {
+        forTestingSetTimeoutToMaxToLetQueryPassHere = setTimeoutToMax;
+    }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index f584e12f6b..b4940c2cfa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -163,10 +163,11 @@ public class ChunkedResultIterator implements PeekingResultIterator {
             ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
                     context.getConnection().getLogLevel());
             long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            //Chunking is deprecated, putting max value for timeout here.
             ResultIterator singleChunkResultIterator =
                     new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
                             scanMetricsHolder, renewLeaseThreshold, plan,
-                            DefaultParallelScanGrouper.getInstance()), chunkSize);
+                            DefaultParallelScanGrouper.getInstance(), Long.MAX_VALUE), chunkSize);
             resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
         }
         return resultIterator;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 44c714f61d..1008b1b939 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -33,9 +33,10 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac
        @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+            long maxQueryEndTime) throws SQLException {
         return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold,
-                plan, scanGrouper, caches);
+                plan, scanGrouper, caches, maxQueryEndTime);
     }
 
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 828de398f4..5b0b04b05f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -80,7 +80,8 @@ public class ParallelIterators extends BaseResultIterators {
     
     @Override
     protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper,
+            long maxQueryEndTime) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -116,7 +117,7 @@ public class ParallelIterators extends BaseResultIterators {
             final TableResultIterator tableResultItr =
                     context.getConnection().getTableResultIteratorFactory().newIterator(
                         mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan,
-                        scanGrouper, caches);
+                        scanGrouper, caches, maxQueryEndTime);
             context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 51d7bf5dc5..8189fbcef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRI
 import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
 import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME;
 import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
@@ -62,6 +63,7 @@ import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,14 +79,16 @@ public class ScanningResultIterator implements ResultIterator {
     private static boolean throwExceptionIfScannerClosedForceFully = false;
 
     private final boolean isMapReduceContext;
+    private final long maxQueryEndTime;
 
-    public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext) {
+    public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) {
         this.scanner = scanner;
         this.scanMetricsHolder = scanMetricsHolder;
         this.context = context;
         scanMetricsUpdated = false;
         scanMetricsEnabled = scan.isScanMetricsEnabled();
         this.isMapReduceContext = isMapReduceContext;
+        this.maxQueryEndTime = maxQueryEndTime;
     }
 
     @Override
@@ -171,6 +175,12 @@ public class ScanningResultIterator implements ResultIterator {
         try {
             Result result = scanner.next();
             while (result != null && (result.isEmpty() || isDummy(result))) {
+                long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
+                if (timeOutForScan < 0) {
+                    throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
+                            ". Query couldn't be completed in the allotted time : "
+                                    + context.getStatement().getQueryTimeoutInMillis() + " ms").build().buildException();
+                }
                 if (!isMapReduceContext && (context.getConnection().isClosing() || context.getConnection().isClosed())) {
                     LOG.warn("Closing ResultScanner as Connection is already closed or in middle of closing");
                     if (throwExceptionIfScannerClosedForceFully) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 8674f3f2a6..ff394a3687 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -81,7 +81,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper, long maxQueryEndTime) {
         ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
         final String tableName = tableRef.getTable().getPhysicalName().getString();
         final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
@@ -100,7 +100,7 @@ public class SerialIterators extends BaseResultIterators {
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
-                    PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches);
+                    PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches, maxQueryEndTime);
                     return itr;
                 }
 
@@ -143,14 +143,16 @@ public class SerialIterators extends BaseResultIterators {
         private PeekingResultIterator currentIterator;
         private Integer remainingOffset;
         private Map<ImmutableBytesPtr,ServerCache> caches;
+        private final long maxQueryEndTime;
         
-        private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+        private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
             this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size());
             this.tableName = tableName;
             this.renewLeaseThreshold = renewLeaseThreshold;
             this.scans.addAll(flattenedScans);
             this.remainingOffset = offset;
             this.caches = caches;
+            this.maxQueryEndTime = maxQueryEndTime;
             if (this.remainingOffset != null) {
                 // mark the last scan for offset purposes
                 this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
@@ -183,7 +185,7 @@ public class SerialIterators extends BaseResultIterators {
                             context.getConnection().getLogLevel());
                 TableResultIterator itr =
                         new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
-                                renewLeaseThreshold, plan, scanGrouper, caches);
+                                renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
                 PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
                 Tuple tuple;
                 if ((tuple = peekingItr.peek()) == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 5e7e2cfe8f..9ef7eb8200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -100,6 +100,7 @@ public class TableResultIterator implements ResultIterator {
     private HashCacheClient hashCacheClient;
 
     private final boolean isMapReduceContext;
+    private final long maxQueryEndTime;
 
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
@@ -112,6 +113,7 @@ public class TableResultIterator implements ResultIterator {
         this.caches = null;
         this.retry = 0;
         this.isMapReduceContext = false;
+        this.maxQueryEndTime = Long.MAX_VALUE;
     }
 
     public static enum RenewLeaseStatus {
@@ -119,23 +121,23 @@ public class TableResultIterator implements ResultIterator {
     };
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
-            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
-        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false);
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, long maxQueryEndTime) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false, maxQueryEndTime);
     }
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
-            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
-        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false);
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false, maxQueryEndTime);
     }
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
-            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext) throws SQLException {
-        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext);
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext, long maxQueryEndTime) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext, maxQueryEndTime);
     }
 
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches,
-            boolean isMapReduceContext) throws SQLException {
+            boolean isMapReduceContext, long maxQueryEndTime) throws SQLException {
         this.scan = scan;
         this.scanMetricsHolder = scanMetricsHolder;
         this.plan = plan;
@@ -149,6 +151,7 @@ public class TableResultIterator implements ResultIterator {
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         this.isMapReduceContext = isMapReduceContext;
+        this.maxQueryEndTime = maxQueryEndTime;
         ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection());
     }
 
@@ -246,7 +249,7 @@ public class TableResultIterator implements ResultIterator {
             if (delegate == UNINITIALIZED_SCANNER) {
                 try {
                     this.scanIterator =
-                            new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext);
+                            new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext, maxQueryEndTime);
                 } catch (IOException e) {
                     Closeables.closeQuietly(htable);
                     throw ServerUtil.parseServerException(e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 0b28d5a2f2..fb573bfb23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.schema.TableRef;
 public interface TableResultIteratorFactory {
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException;
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+            long maxQueryEndTime) throws SQLException;
 
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index da3f6becc1..b047dbccb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -77,8 +77,9 @@ public class TableSnapshotResultIterator implements ResultIterator {
   private StatementContext context;
 
   private final boolean isMapReduceContext;
+  private final long maxQueryEndTime;
 
-  public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext)
+  public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime)
       throws IOException {
     this.configuration = configuration;
     this.currentRegion = -1;
@@ -97,6 +98,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
     this.rootDir = CommonFSUtils.getRootDir(configuration);
     this.fs = rootDir.getFileSystem(configuration);
     this.isMapReduceContext = isMapReduceContext;
+    this.maxQueryEndTime = maxQueryEndTime;
     init();
   }
 
@@ -158,7 +160,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
         RegionInfo hri = regions.get(this.currentRegion);
         this.scanIterator =
             new ScanningResultIterator(new SnapshotScanner(configuration, fs, restoreDir, htd, hri, scan),
-                scan, scanMetricsHolder, context, isMapReduceContext);
+                scan, scanMetricsHolder, context, isMapReduceContext, maxQueryEndTime);
       } catch (Throwable e) {
         throw ServerUtil.parseServerException(e);
       }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 970eb6f074..014e47b4e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,6 +132,9 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                 // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
 
+                //Get QueryTimeout From Statement
+                final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                final long maxQueryEndTime = startTime + queryPlan.getContext().getStatement().getQueryTimeoutInMillis();
                 PeekingResultIterator peekingResultIterator;
                 ScanMetricsHolder scanMetricsHolder =
                   ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
@@ -138,7 +142,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                 if (snapshotName != null) {
                   // result iterator to read snapshots
                   final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,
-                      scanMetricsHolder, queryPlan.getContext(), true);
+                      scanMetricsHolder, queryPlan.getContext(), true, maxQueryEndTime);
                     peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator);
                     LOGGER.info("Adding TableSnapshotResultIterator for scan: " + scan);
                 } else {
@@ -146,7 +150,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
                       new TableResultIterator(
                           queryPlan.getContext().getConnection().getMutationState(), scan,
                           scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
-                          this.scanGrouper, true);
+                          this.scanGrouper, true, maxQueryEndTime);
                   peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
                   LOGGER.info("Adding TableResultIterator for scan: " + scan);
                 }