You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/21 06:49:08 UTC
[1/2] phoenix git commit: PHOENIX-1463 phoenix.query.timeoutMs
doesn't work as expected (Samarth Jain)
Repository: phoenix
Updated Branches:
refs/heads/4.2 4c47b0e40 -> 1a4c4300a
PHOENIX-1463 phoenix.query.timeoutMs doesn't work as expected (Samarth Jain)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1a4c4300
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1a4c4300
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1a4c4300
Branch: refs/heads/4.2
Commit: 1a4c4300a7a9f4c52eef2ba6dba264ec8385c8e0
Parents: a1ba9ba
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Nov 20 20:57:22 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 20 21:48:57 2014 -0800
----------------------------------------------------------------------
.../phoenix/exception/SQLExceptionCode.java | 1 +
.../phoenix/iterate/BaseResultIterators.java | 53 ++++++++++++++------
2 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a4c4300/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index bf13eec..9fb456b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -289,6 +289,7 @@ public enum SQLExceptionCode {
OUTDATED_JARS(2007, "INT09", "Outdated jars."),
INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "),
UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code"),
+ OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out")
;
private final int errorCode;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1a4c4300/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c873494..446a182 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -46,6 +47,8 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.parse.FilterableStatement;
@@ -513,20 +516,26 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(numScans);
allFutures.add(futures);
SQLException toThrow = null;
+ int queryTimeOut = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
+ long maxQueryEndTime = System.currentTimeMillis() + queryTimeOut;
try {
submitWork(scans, futures, allIterators, splits.size());
- int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
for (Pair<Scan,Future<PeekingResultIterator>> scanPair : reverseIfNecessary(future,isReverse)) {
try {
- PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+ if (timeOutForScan < 0) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
+ }
+ PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
concatIterators.add(iterator);
} catch (ExecutionException e) {
try { // Rethrow as SQLException
throw ServerUtil.parseServerException(e);
- } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date
+ } catch (StaleRegionBoundaryCacheException e2) {
+ // Catch only to try to recover from region boundary cache being out of date
List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2);
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(physicalTableName);
@@ -551,7 +560,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Immediate do a get (not catching exception again) and then add the iterators we
// get back immediately. They'll be sorted as expected, since they're replacing the
// original one.
- PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS);
+ long timeOutForScan = maxQueryEndTime - System.currentTimeMillis();
+ if (timeOutForScan < 0) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
+ }
+ PeekingResultIterator iterator = newScanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
iterators.add(iterator);
}
}
@@ -563,6 +576,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
success = true;
return iterators;
+ } catch (TimeoutException e) {
+ // thrown when a thread times out waiting for the future.get() call to return
+ toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT)
+ .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms")
+ .setRootCause(e).build().buildException();
} catch (SQLException e) {
toThrow = e;
} catch (Exception e) {
@@ -605,22 +623,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Don't call cancel on already started work, as it causes the HConnection
// to get into a funk. Instead, just cancel queued work.
boolean cancelledWork = false;
- for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
- for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
- for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
- // When work is rejected, we may have null futurePair entries, because
- // we randomize these and set them as they're submitted.
- if (futurePair != null) {
- Future<PeekingResultIterator> future = futurePair.getSecond();
- if (future != null) {
- cancelledWork |= future.cancel(false);
+ try {
+ for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
+ for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
+ for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) {
+ // When work is rejected, we may have null futurePair entries, because
+ // we randomize these and set them as they're submitted.
+ if (futurePair != null) {
+ Future<PeekingResultIterator> future = futurePair.getSecond();
+ if (future != null) {
+ cancelledWork |= future.cancel(false);
+ }
}
}
}
}
- }
- if (cancelledWork) {
- context.getConnection().getQueryServices().getExecutor().purge();
+ } finally {
+ if (cancelledWork) {
+ context.getConnection().getQueryServices().getExecutor().purge();
+ }
}
}
[2/2] phoenix git commit: PHOENIX-1466 Prevent multiple scans when
query run serially
Posted by ja...@apache.org.
PHOENIX-1466 Prevent multiple scans when query run serially
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a1ba9bac
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a1ba9bac
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a1ba9bac
Branch: refs/heads/4.2
Commit: a1ba9bace1f44ed418bc01467a10ab2b3eea848b
Parents: 4c47b0e
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Nov 18 10:37:28 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 20 21:48:57 2014 -0800
----------------------------------------------------------------------
.../phoenix/end2end/QueryWithLimitIT.java | 119 +++++++++++++++++++
.../org/apache/phoenix/execute/ScanPlan.java | 81 +++++++------
.../DistinctValueWithCountServerAggregator.java | 5 -
.../iterate/ParallelIteratorFactory.java | 7 ++
.../apache/phoenix/iterate/SerialIterators.java | 9 +-
.../phoenix/iterate/TableResultIterator.java | 42 +++++--
6 files changed, 206 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
new file mode 100644
index 0000000..2df9514
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithLimitIT.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.KEYONLY_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class QueryWithLimitIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Must update config before starting server
+ props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50));
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1));
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
+ props.put(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, Integer.toString(0)); // Prevents RejectedExecutionException when deleting sequences
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testQueryWithLimitAndStats() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ ensureTableCreated(getUrl(),KEYONLY_NAME);
+ initTableValues(conn, 100);
+
+ String query = "SELECT i1 FROM KEYONLY LIMIT 1";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" +
+ " SERVER FILTER BY PageFilter 1\n" +
+ " SERVER 1 ROW LIMIT\n" +
+ "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs));
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testQueryWithoutLimitFails() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+
+ ensureTableCreated(getUrl(),KEYONLY_NAME);
+ initTableValues(conn, 100);
+ conn.createStatement().execute("UPDATE STATISTICS " + KEYONLY_NAME);
+
+ String query = "SELECT i1 FROM KEYONLY";
+ try {
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ rs.next();
+ fail();
+ } catch (SQLException e) {
+ assertTrue(e.getCause() instanceof RejectedExecutionException);
+ }
+ conn.close();
+ }
+
+ protected static void initTableValues(Connection conn, int nRows) throws Exception {
+ PreparedStatement stmt = conn.prepareStatement(
+ "upsert into " +
+ "KEYONLY VALUES (?, ?)");
+ for (int i = 0; i < nRows; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i+1);
+ stmt.execute();
+ }
+
+ conn.commit();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 4ca0338..d82e8f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -53,6 +53,7 @@ import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,10 +73,10 @@ public class ScanPlan extends BaseQueryPlan {
private List<List<Scan>> scans;
private boolean allowPageFilter;
- public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
+ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
parallelIteratorFactory != null ? parallelIteratorFactory :
- buildResultIteratorFactory(context, table, orderBy));
+ buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter));
this.allowPageFilter = allowPageFilter;
if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
@@ -84,9 +85,51 @@ public class ScanPlan extends BaseQueryPlan {
}
}
+ private static boolean isSerial(StatementContext context,
+ TableRef tableRef, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
+ Scan scan = context.getScan();
+ /*
+ * If a limit is provided and we have no filter, run the scan serially when we estimate that
+ * the limit's worth of data will fit into a single region.
+ */
+ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+ Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
+ if (perScanLimit == null || scan.getFilter() != null) {
+ return false;
+ }
+ PTable table = tableRef.getTable();
+ GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
+ long estRowSize = SchemaUtil.estimateRowSize(table);
+ long estRegionSize;
+ if (gpsInfo == null) {
+ // Use guidepost depth as minimum size
+ ConnectionQueryServices services = context.getConnection().getQueryServices();
+ HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes());
+ int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+ long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+ estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
+ } else {
+ // Region size estimated based on total number of bytes divided by number of regions
+ estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
+ }
+ // TODO: configurable number of bytes?
+ boolean isSerial = (perScanLimit * estRowSize < estRegionSize);
+
+ if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("With LIMIT=" + perScanLimit
+ + ", estimated row size=" + estRowSize
+ + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)"
+ + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", context.getConnection()));
+ return isSerial;
+ }
+
private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context,
- TableRef table, OrderBy orderBy) {
+ TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException {
+ if (isSerial(context, table, orderBy, limit, allowPageFilter)) {
+ return ParallelIteratorFactory.NOOP_FACTORY;
+ }
ParallelIteratorFactory spoolingResultIteratorFactory =
new SpoolingResultIterator.SpoolingResultIteratorFactory(
context.getConnection().getQueryServices());
@@ -125,38 +168,8 @@ public class ScanPlan extends BaseQueryPlan {
* limit is provided, run query serially.
*/
boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
- boolean isSerial = false;
+ boolean isSerial = isSerial(context, tableRef, orderBy, limit, allowPageFilter);
Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
- /*
- * If a limit is provided and we have no filter, run the scan serially when we estimate that
- * the limit's worth of data will fit into a single region.
- */
- if (perScanLimit != null && scan.getFilter() == null) {
- GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
- long estRowSize = SchemaUtil.estimateRowSize(table);
- long estRegionSize;
- if (gpsInfo == null) {
- // Use guidepost depth as minimum size
- ConnectionQueryServices services = context.getConnection().getQueryServices();
- HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes());
- int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
- long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
- estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc);
- } else {
- // Region size estimated based on total number of bytes divided by number of regions
- estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1);
- }
- // TODO: configurable number of bytes?
- if (perScanLimit * estRowSize < estRegionSize) {
- isSerial = true;
- }
- if (logger.isDebugEnabled()) logger.debug("With LIMIT=" + perScanLimit
- + ", estimated row size=" + estRowSize
- + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)"
- + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution");
- }
ResultIterators iterators;
if (isSerial) {
iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index a3141b1..3a1789b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -24,9 +24,6 @@ import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -35,7 +32,6 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.SizedUtil;
-
import org.iq80.snappy.Snappy;
/**
@@ -45,7 +41,6 @@ import org.iq80.snappy.Snappy;
* @since 1.2.1
*/
public class DistinctValueWithCountServerAggregator extends BaseAggregator {
- private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index 1ad3af0..df8f658 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -23,5 +23,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.StatementContext;
public interface ParallelIteratorFactory {
+ public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
+ @Override
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan)
+ throws SQLException {
+ return LookAheadResultIterator.wrap(scanner);
+ }
+ };
PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 4be7b56..c01a268 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -49,14 +49,12 @@ public class SerialIterators extends BaseResultIterators {
private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class);
private static final String NAME = "SERIAL";
private final ParallelIteratorFactory iteratorFactory;
- private final int limit;
public SerialIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
throws SQLException {
super(plan, perScanLimit);
Preconditions.checkArgument(perScanLimit != null); // must be a limit specified
this.iteratorFactory = iteratorFactory;
- this.limit = perScanLimit;
}
@Override
@@ -88,9 +86,8 @@ public class SerialIterators extends BaseResultIterators {
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan));
}
PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
- PeekingResultIterator iterator = new LimitingPeekingResultIterator(concatIterator, limit);
- allIterators.add(iterator);
- return iterator;
+ allIterators.add(concatIterator);
+ return concatIterator;
}
/**
@@ -102,7 +99,7 @@ public class SerialIterators extends BaseResultIterators {
public Object getJobId() {
return SerialIterators.this;
}
- }, "Parallel scanner for table: " + tableRef.getTable().getName().getString()));
+ }, "Serial scanner for table: " + tableRef.getTable().getName().getString()));
// Add our singleton Future which will execute serially
nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a1ba9bac/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 58abec5..9cc4ad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -40,28 +39,48 @@ import org.apache.phoenix.util.ServerUtil;
* @since 0.1
*/
public class TableResultIterator extends ExplainTable implements ResultIterator {
+ private final Scan scan;
private final HTableInterface htable;
- private final ResultIterator delegate;
+ private volatile ResultIterator delegate;
public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException {
this(context, tableRef, context.getScan());
}
+ /*
+ * Delay the creation of the underlying HBase ResultScanner if creationMode is DELAYED.
+ * Though no rows are returned when the scanner is created, it still makes several RPCs
+ * to open the scanner. In queries run serially (i.e. SELECT ... LIMIT 1), we do not
+ * want to be hit with this cost when it's likely we'll never execute those scanners.
+ */
+ private ResultIterator getDelegate(boolean isClosing) throws SQLException {
+ ResultIterator delegate = this.delegate;
+ if (delegate == null) {
+ synchronized (this) {
+ delegate = this.delegate;
+ if (delegate == null) {
+ try {
+ this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan));
+ } catch (IOException e) {
+ Closeables.closeQuietly(htable);
+ throw ServerUtil.parseServerException(e);
+ }
+ }
+ }
+ }
+ return delegate;
+ }
+
public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException {
super(context, tableRef);
+ this.scan = scan;
htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
- try {
- delegate = new ScanningResultIterator(htable.getScanner(scan));
- } catch (IOException e) {
- Closeables.closeQuietly(htable);
- throw ServerUtil.parseServerException(e);
- }
}
@Override
public void close() throws SQLException {
try {
- delegate.close();
+ getDelegate(true).close();
} finally {
try {
htable.close();
@@ -73,7 +92,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
@Override
public Tuple next() throws SQLException {
- return delegate.next();
+ return getDelegate(false).next();
}
@Override
@@ -84,7 +103,6 @@ public class TableResultIterator extends ExplainTable implements ResultIterator
@Override
public String toString() {
- return "TableResultIterator [htable=" + htable + ", delegate="
- + delegate + "]";
+ return "TableResultIterator [htable=" + htable + ", scan=" + scan + "]";
}
}