You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2023/04/18 17:05:12 UTC
[phoenix] branch 5.1 updated: PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 6bcd8011a6 PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)
6bcd8011a6 is described below
commit 6bcd8011a6152a0b5b53c04a1e2ef380616ccc39
Author: Lokesh Khurana <kh...@gmail.com>
AuthorDate: Mon Apr 17 10:06:51 2023 -0700
PHOENIX-6918 :- ScanningResultIterator should not retry when the query times out (#1589)
Co-authored-by: Lokesh Khurana <lo...@salesforce.com>
---
.../org/apache/phoenix/end2end/MapReduceIT.java | 106 ++++++++++++++++++++-
.../iterate/DelayedTableResultIteratorFactory.java | 9 +-
.../phoenix/iterate/PhoenixQueryTimeoutIT.java | 65 ++++++++++++-
.../phoenix/iterate/BaseResultIterators.java | 19 +++-
.../phoenix/iterate/ChunkedResultIterator.java | 3 +-
.../iterate/DefaultTableResultIteratorFactory.java | 5 +-
.../apache/phoenix/iterate/ParallelIterators.java | 5 +-
.../phoenix/iterate/ScanningResultIterator.java | 12 ++-
.../apache/phoenix/iterate/SerialIterators.java | 10 +-
.../phoenix/iterate/TableResultIterator.java | 19 ++--
.../iterate/TableResultIteratorFactory.java | 3 +-
.../iterate/TableSnapshotResultIterator.java | 6 +-
.../phoenix/mapreduce/PhoenixRecordReader.java | 8 +-
13 files changed, 233 insertions(+), 37 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index 62082624ec..21b25bf92d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -107,6 +108,34 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testMapReduceWithVerySmallPhoenixQueryTimeout() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, true);
+ }
+ }
+
+ @Test
+ public void testMapReduceWithVerySmallPhoenixQueryTimeoutWithTenantId() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, TENANT_ID, true);
+ }
+ }
+
+ @Test
+ public void testMapReduceWithNormalPhoenixQueryTimeout() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 82.89, null, false);
+ }
+ }
+
+ @Test
+ public void testMapReduceWithNormalPhoenixQueryTimeoutWithTenantId() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createPagedJobAndTestFailedJobDueToTimeOut(conn, RECORDING_YEAR + " % 2 = 0", 81.04, TENANT_ID, false);
+ }
+ }
+
@Test
public void testWithTenantId() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())){
@@ -116,7 +145,7 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
}
- private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws
+ private void createAndTestJob(Connection conn, String whereCondition, double maxExpected, String tenantId) throws
SQLException, IOException, InterruptedException, ClassNotFoundException {
String stockTableName = generateUniqueName();
String stockStatsTableName = generateUniqueName();
@@ -126,12 +155,79 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
final Configuration conf = getUtility().getConfiguration();
Job job = Job.getInstance(conf);
if (tenantId != null) {
- setInputForTenant(job, tenantId, stockTableName, s);
+ setInputForTenant(job, tenantId, stockTableName, whereCondition);
+ } else {
+ PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
+ stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ }
+ testJob(conn, job, stockTableName, stockStatsTableName, maxExpected);
+
+ }
+
+ private void createPagedJobAndTestFailedJobDueToTimeOut(Connection conn, String whereCondition, double maxExpected, String tenantId,
+ boolean testVerySmallTimeOut) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
+ String stockTableName = generateUniqueName();
+ String stockStatsTableName = generateUniqueName();
+ conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
+ conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
+ conn.commit();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ if (testVerySmallTimeOut) {
+ //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately,
+ //Need to set this at conf level as queryPlan generated at PhoenixInputFormat creates a new connection from
+ //JobContext's configuration.
+ conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+ conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(1));
+ } else {
+ conf.set(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+ conf.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Integer.toString(600000));
+ }
+
+ Job job = Job.getInstance(conf);
+ if (tenantId != null) {
+ setInputForTenant(job, tenantId, stockTableName, whereCondition);
} else {
PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class,
- stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ stockTableName, whereCondition, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+ }
+
+ assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0,
+ TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
+ upsertData(conn, stockTableName);
+
+ // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints
+ job.getConfiguration().set("mapreduce.framework.name", "local");
+ setOutput(job, stockStatsTableName);
+
+ job.setMapperClass(StockMapper.class);
+ job.setReducerClass(StockReducer.class);
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(DoubleWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(StockWritable.class);
+
+ if (testVerySmallTimeOut) {
+ // run job and it should fail due to Timeout
+ assertFalse("Job should fail with QueryTimeout.", job.waitForCompletion(true));
+ } else {
+ //run
+ assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true));
+
+ //verify
+ ResultSet stats = DriverManager.getConnection(getUrl()).createStatement()
+ .executeQuery("SELECT * FROM " + stockStatsTableName);
+ assertTrue("No data stored in stats table!", stats.next());
+ String name = stats.getString(1);
+ double max = stats.getDouble(2);
+ assertEquals("Got the wrong stock name!", "AAPL", name);
+ assertEquals("Max value didn't match the expected!", maxExpected, max, 0);
+ assertFalse("Should only have stored one row in stats table!", stats.next());
+ assertEquals("There should have been only be 1 call to getRegionBoundaries "
+ + "(corresponding to the driver code)", 1,
+ TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
}
- testJob(conn, job, stockTableName, stockStatsTableName, v);
}
@@ -199,8 +295,10 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
private void upsertData(Connection conn, String stockTableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
+ upsertData(stmt, "AAPL", 2010, new Double[]{73.48, 82.25, 75.2, 82.89});
upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
+ upsertData(stmt, "AAPL", 2007, new Double[]{73.88, 80.24, 78.9, 66.3});
conn.commit();
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 23bfebd449..74cb7f472e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -45,16 +45,17 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
- QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+ QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+ long maxQueryEndTime) throws SQLException {
return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetricsHolder,
- renewLeaseThreshold, plan, scanGrouper, caches);
+ renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
}
private class DelayedTableResultIterator extends TableResultIterator {
public DelayedTableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold, QueryPlan plan,
- ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
- super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches);
+ ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
+ super(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
}
@Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index 45adcfbc68..46a62d9c53 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -29,8 +30,10 @@ import java.util.Properties;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -49,14 +52,13 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
tableName = generateUniqueName();
int numRows = 1000;
String ddl =
- "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+ "CREATE TABLE " + tableName + " (K INTEGER NOT NULL PRIMARY KEY, V VARCHAR)";
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(ddl);
String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
PreparedStatement stmt = conn.prepareStatement(dml);
for (int i = 1; i <= numRows; i++) {
- String key = "key" + i;
- stmt.setString(1, key);
+ stmt.setInt(1, i);
stmt.setString(2, "value" + i);
stmt.executeUpdate();
}
@@ -99,10 +101,51 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
}
assertEquals("Unexpected number of records returned", 1000, count);
} catch (Exception e) {
- fail("Expected query to suceed");
+ fail("Expected query to succeed");
}
}
+ @Test
+ public void testScanningResultIteratorQueryTimeoutForPagingWithVeryLowTimeout() throws Exception {
+ //Arrange
+ PreparedStatement ps = loadDataAndPreparePagedQuery(1,1);
+
+ //Act + Assert
+ try {
+ //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it.
+ BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true);
+ ResultSet rs = ps.executeQuery();
+ while(rs.next()) {}
+ fail("Expected query to timeout with a 1 ms timeout");
+ } catch (SQLException e) {
+ //OPERATION_TIMED_OUT Exception expected
+ assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode());
+ } finally {
+ BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
+ }
+ }
+
+ @Test
+ public void testScanningResultIteratorQueryTimeoutForPagingWithNormalLowTimeout() throws Exception {
+ //Arrange
+ PreparedStatement ps = loadDataAndPreparePagedQuery(30000,30);
+
+ //Act + Assert
+ try {
+ //Do not let BaseResultIterators throw Timeout Exception Let ScanningResultIterator handle it.
+ BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(true);
+ ResultSet rs = ps.executeQuery();
+ int count = 0;
+ while(rs.next()) {
+ count++;
+ }
+ assertEquals("Unexpected number of records returned", 500, count);
+ } catch (SQLException e) {
+ fail("Expected query to succeed");
+ } finally {
+ BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
+ }
+ }
//-----------------------------------------------------------------
// Private Helper Methods
@@ -118,4 +161,18 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT {
assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout());
return ps;
}
+
+ private PreparedStatement loadDataAndPreparePagedQuery(int timeoutMs, int timeoutSecs) throws Exception {
+ Properties props = new Properties();
+ //Setting Paging Size to 0 and Query Timeout to 1ms so that query get paged quickly and times out immediately
+ props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, String.valueOf(timeoutMs));
+ props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(0));
+ PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+ PreparedStatement ps = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE K % 2 = 0");
+ PhoenixStatement phoenixStmt = ps.unwrap(PhoenixStatement.class);
+ assertEquals(timeoutMs, phoenixStmt.getQueryTimeoutInMillis());
+ assertEquals(timeoutSecs, phoenixStmt.getQueryTimeout());
+ assertEquals(0, conn.getQueryServices().getProps().getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1));
+ return ps;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 45ec1f09cd..71263c0a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -118,6 +118,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -169,6 +170,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final boolean useStatsForParallelization;
protected Map<ImmutableBytesPtr,ServerCache> caches;
private final QueryPlan dataPlan;
+ private static boolean forTestingSetTimeoutToMaxToLetQueryPassHere = false;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -1348,7 +1350,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
try {
- submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
+ submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
@@ -1357,6 +1359,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next();
try {
long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
+ if (forTestingSetTimeoutToMaxToLetQueryPassHere) {
+ timeOutForScan = Long.MAX_VALUE;
+ }
if (timeOutForScan < 0) {
throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
". Query couldn't be completed in the allotted time: "
@@ -1598,7 +1603,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
abstract protected String getName();
abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException;
+ Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper,
+ long maxQueryEndTime) throws SQLException;
@Override
public int size() {
@@ -1710,4 +1716,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return this.estimateInfoTimestamp;
}
+ /**
+ * Used for specific test case to check if timeouts are working in ScanningResultIterator.
+ * @param setTimeoutToMax
+ */
+ @VisibleForTesting
+ public static void setForTestingSetTimeoutToMaxToLetQueryPassHere(boolean setTimeoutToMax) {
+ forTestingSetTimeoutToMaxToLetQueryPassHere = setTimeoutToMax;
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index f584e12f6b..b4940c2cfa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -163,10 +163,11 @@ public class ChunkedResultIterator implements PeekingResultIterator {
ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
context.getConnection().getLogLevel());
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ //Chunking is deprecated, putting max value for timeout here.
ResultIterator singleChunkResultIterator =
new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
scanMetricsHolder, renewLeaseThreshold, plan,
- DefaultParallelScanGrouper.getInstance()), chunkSize);
+ DefaultParallelScanGrouper.getInstance(), Long.MAX_VALUE), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
return resultIterator;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 44c714f61d..1008b1b939 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -33,9 +33,10 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
- QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+ QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+ long maxQueryEndTime) throws SQLException {
return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold,
- plan, scanGrouper, caches);
+ plan, scanGrouper, caches, maxQueryEndTime);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 828de398f4..5b0b04b05f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -80,7 +80,8 @@ public class ParallelIterators extends BaseResultIterators {
@Override
protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper,
+ long maxQueryEndTime) throws SQLException {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
@@ -116,7 +117,7 @@ public class ParallelIterators extends BaseResultIterators {
final TableResultIterator tableResultItr =
context.getConnection().getTableResultIteratorFactory().newIterator(
mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan,
- scanGrouper, caches);
+ scanGrouper, caches, maxQueryEndTime);
context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 51d7bf5dc5..8189fbcef9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRI
import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME;
import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.apache.phoenix.exception.SQLExceptionCode.OPERATION_TIMED_OUT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_IN_REMOTE_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS;
@@ -62,6 +63,7 @@ import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,14 +79,16 @@ public class ScanningResultIterator implements ResultIterator {
private static boolean throwExceptionIfScannerClosedForceFully = false;
private final boolean isMapReduceContext;
+ private final long maxQueryEndTime;
- public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext) {
+ public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) {
this.scanner = scanner;
this.scanMetricsHolder = scanMetricsHolder;
this.context = context;
scanMetricsUpdated = false;
scanMetricsEnabled = scan.isScanMetricsEnabled();
this.isMapReduceContext = isMapReduceContext;
+ this.maxQueryEndTime = maxQueryEndTime;
}
@Override
@@ -171,6 +175,12 @@ public class ScanningResultIterator implements ResultIterator {
try {
Result result = scanner.next();
while (result != null && (result.isEmpty() || isDummy(result))) {
+ long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
+ if (timeOutForScan < 0) {
+ throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
+ ". Query couldn't be completed in the allotted time : "
+ + context.getStatement().getQueryTimeoutInMillis() + " ms").build().buildException();
+ }
if (!isMapReduceContext && (context.getConnection().isClosing() || context.getConnection().isClosed())) {
LOG.warn("Closing ResultScanner as Connection is already closed or in middle of closing");
if (throwExceptionIfScannerClosedForceFully) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 8674f3f2a6..ff394a3687 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -81,7 +81,7 @@ public class SerialIterators extends BaseResultIterators {
@Override
protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper, long maxQueryEndTime) {
ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
final String tableName = tableRef.getTable().getPhysicalName().getString();
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
@@ -100,7 +100,7 @@ public class SerialIterators extends BaseResultIterators {
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
@Override
public PeekingResultIterator call() throws Exception {
- PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches);
+ PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches, maxQueryEndTime);
return itr;
}
@@ -143,14 +143,16 @@ public class SerialIterators extends BaseResultIterators {
private PeekingResultIterator currentIterator;
private Integer remainingOffset;
private Map<ImmutableBytesPtr,ServerCache> caches;
+ private final long maxQueryEndTime;
- private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
+ private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size());
this.tableName = tableName;
this.renewLeaseThreshold = renewLeaseThreshold;
this.scans.addAll(flattenedScans);
this.remainingOffset = offset;
this.caches = caches;
+ this.maxQueryEndTime = maxQueryEndTime;
if (this.remainingOffset != null) {
// mark the last scan for offset purposes
this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
@@ -183,7 +185,7 @@ public class SerialIterators extends BaseResultIterators {
context.getConnection().getLogLevel());
TableResultIterator itr =
new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
- renewLeaseThreshold, plan, scanGrouper, caches);
+ renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
Tuple tuple;
if ((tuple = peekingItr.peek()) == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 5e7e2cfe8f..9ef7eb8200 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -100,6 +100,7 @@ public class TableResultIterator implements ResultIterator {
private HashCacheClient hashCacheClient;
private final boolean isMapReduceContext;
+ private final long maxQueryEndTime;
@VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
TableResultIterator() {
@@ -112,6 +113,7 @@ public class TableResultIterator implements ResultIterator {
this.caches = null;
this.retry = 0;
this.isMapReduceContext = false;
+ this.maxQueryEndTime = Long.MAX_VALUE;
}
public static enum RenewLeaseStatus {
@@ -119,23 +121,23 @@ public class TableResultIterator implements ResultIterator {
};
public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
- long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
- this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false);
+ long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, long maxQueryEndTime) throws SQLException {
+ this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, false, maxQueryEndTime);
}
public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
- long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
- this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false);
+ long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches, long maxQueryEndTime) throws SQLException {
+ this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, false, maxQueryEndTime);
}
public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
- long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext) throws SQLException {
- this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext);
+ long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper, boolean isMapReduceContext, long maxQueryEndTime) throws SQLException {
+ this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null, isMapReduceContext, maxQueryEndTime);
}
public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches,
- boolean isMapReduceContext) throws SQLException {
+ boolean isMapReduceContext, long maxQueryEndTime) throws SQLException {
this.scan = scan;
this.scanMetricsHolder = scanMetricsHolder;
this.plan = plan;
@@ -149,6 +151,7 @@ public class TableResultIterator implements ResultIterator {
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
this.isMapReduceContext = isMapReduceContext;
+ this.maxQueryEndTime = maxQueryEndTime;
ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection());
}
@@ -246,7 +249,7 @@ public class TableResultIterator implements ResultIterator {
if (delegate == UNINITIALIZED_SCANNER) {
try {
this.scanIterator =
- new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext);
+ new ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder, plan.getContext(), isMapReduceContext, maxQueryEndTime);
} catch (IOException e) {
Closeables.closeQuietly(htable);
throw ServerUtil.parseServerException(e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 0b28d5a2f2..fb573bfb23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.schema.TableRef;
public interface TableResultIteratorFactory {
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
- QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException;
+ QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches,
+ long maxQueryEndTime) throws SQLException;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index da3f6becc1..b047dbccb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -77,8 +77,9 @@ public class TableSnapshotResultIterator implements ResultIterator {
private StatementContext context;
private final boolean isMapReduceContext;
+ private final long maxQueryEndTime;
- public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext)
+ public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime)
throws IOException {
this.configuration = configuration;
this.currentRegion = -1;
@@ -97,6 +98,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
this.rootDir = CommonFSUtils.getRootDir(configuration);
this.fs = rootDir.getFileSystem(configuration);
this.isMapReduceContext = isMapReduceContext;
+ this.maxQueryEndTime = maxQueryEndTime;
init();
}
@@ -158,7 +160,7 @@ public class TableSnapshotResultIterator implements ResultIterator {
RegionInfo hri = regions.get(this.currentRegion);
this.scanIterator =
new ScanningResultIterator(new SnapshotScanner(configuration, fs, restoreDir, htd, hri, scan),
- scan, scanMetricsHolder, context, isMapReduceContext);
+ scan, scanMetricsHolder, context, isMapReduceContext, maxQueryEndTime);
} catch (Throwable e) {
throw ServerUtil.parseServerException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 970eb6f074..014e47b4e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,6 +132,9 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
// For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+ //Get QueryTimeout From Statement
+ final long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ final long maxQueryEndTime = startTime + queryPlan.getContext().getStatement().getQueryTimeoutInMillis();
PeekingResultIterator peekingResultIterator;
ScanMetricsHolder scanMetricsHolder =
ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
@@ -138,7 +142,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
if (snapshotName != null) {
// result iterator to read snapshots
final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,
- scanMetricsHolder, queryPlan.getContext(), true);
+ scanMetricsHolder, queryPlan.getContext(), true, maxQueryEndTime);
peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator);
LOGGER.info("Adding TableSnapshotResultIterator for scan: " + scan);
} else {
@@ -146,7 +150,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
new TableResultIterator(
queryPlan.getContext().getConnection().getMutationState(), scan,
scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
- this.scanGrouper, true);
+ this.scanGrouper, true, maxQueryEndTime);
peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
LOGGER.info("Adding TableResultIterator for scan: " + scan);
}