You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/21 09:02:19 UTC
git commit: PHOENIX-1188 Performance regression for non-aggregate
queries
Repository: phoenix
Updated Branches:
refs/heads/3.0 5ca432b2d -> 6d1476225
PHOENIX-1188 Performance regression for non-aggregate queries
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6d147622
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6d147622
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6d147622
Branch: refs/heads/3.0
Commit: 6d1476225fedf58c46c2263344462aa98fc2d9ff
Parents: 5ca432b
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Aug 21 00:04:12 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Aug 21 00:04:12 2014 -0700
----------------------------------------------------------------------
.../phoenix/iterate/ChunkedResultIterator.java | 83 +++++++-------------
.../phoenix/query/QueryServicesOptions.java | 9 ++-
2 files changed, 35 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d147622/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index d7fbe79..c702e99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -29,10 +29,13 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* {@code PeekingResultIterator} implementation that loads data in chunks. This is intended for
* basic scan plans, to avoid loading large quantities of data from HBase in one go.
@@ -41,7 +44,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory;
- private SingleChunkResultIterator singleChunkResultIterator;
+ private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
private final StatementContext context;
private final TableRef tableRef;
private Scan scan;
@@ -71,12 +74,19 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory,
- StatementContext context, TableRef tableRef, Scan scan, long chunkSize) {
+ StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
this.scan = scan;
this.chunkSize = chunkSize;
+ // Instantiate single chunk iterator and the delegate iterator in constructor
+ // to get parallel scans kicked off in separate threads. If we delay this,
+ // we'll get serialized behavior (see PHOENIX-
+ if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
+ ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
+ new TableResultIterator(context, tableRef, scan), chunkSize);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
}
@Override
@@ -96,26 +106,16 @@ public class ChunkedResultIterator implements PeekingResultIterator {
@Override
public void close() throws SQLException {
- if (resultIterator != null) {
- resultIterator.close();
- }
- if (singleChunkResultIterator != null) {
- singleChunkResultIterator.close();
- }
+ resultIterator.close();
}
private PeekingResultIterator getResultIterator() throws SQLException {
- if (resultIterator == null) {
- if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
- singleChunkResultIterator = new SingleChunkResultIterator(
- new TableResultIterator(context, tableRef, scan), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
- } else if (resultIterator.peek() == null && !singleChunkResultIterator.isEndOfStreamReached()) {
- singleChunkResultIterator.close();
+ if (resultIterator.peek() == null && lastKey != null) {
+ resultIterator.close();
scan = ScanUtil.newScan(scan);
- scan.setStartRow(Bytes.add(singleChunkResultIterator.getLastKey(), new byte[]{0}));
+ scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
if (logger.isDebugEnabled()) logger.debug("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan);
- singleChunkResultIterator = new SingleChunkResultIterator(
+ ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
new TableResultIterator(context, tableRef, scan), chunkSize);
resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan);
}
@@ -125,23 +125,22 @@ public class ChunkedResultIterator implements PeekingResultIterator {
/**
* ResultIterator that runs over a single chunk of results (i.e. a portion of a scan).
*/
- private static class SingleChunkResultIterator implements ResultIterator {
+ private class SingleChunkResultIterator implements ResultIterator {
private int rowCount = 0;
private boolean chunkComplete;
- private boolean endOfStreamReached;
- private Tuple lastTuple;
private final ResultIterator delegate;
private final long chunkSize;
private SingleChunkResultIterator(ResultIterator delegate, long chunkSize) {
+ Preconditions.checkArgument(chunkSize > 0);
this.delegate = delegate;
this.chunkSize = chunkSize;
}
@Override
public Tuple next() throws SQLException {
- if (isChunkComplete() || isEndOfStreamReached()) {
+ if (chunkComplete || lastKey == null) {
return null;
}
Tuple next = delegate.next();
@@ -150,14 +149,15 @@ public class ChunkedResultIterator implements PeekingResultIterator {
// necessary for (at least) hash joins, as they can return multiple rows with the
// same row key. Stopping a chunk at a row key boundary is necessary in order to
// be able to start the next chunk on the next row key
- if (rowCount >= chunkSize && rowKeyChanged(lastTuple, next)) {
+ if (rowCount == chunkSize) {
+ next.getKey(lastKey);
+ } else if (rowCount > chunkSize && rowKeyChanged(next)) {
chunkComplete = true;
return null;
}
- lastTuple = next;
rowCount++;
} else {
- endOfStreamReached = true;
+ lastKey = null;
}
return next;
}
@@ -172,36 +172,13 @@ public class ChunkedResultIterator implements PeekingResultIterator {
delegate.close();
}
- /**
- * Returns true if the current chunk has been fully iterated over.
- */
- public boolean isChunkComplete() {
- return chunkComplete;
- }
-
- /**
- * Returns true if the end of all chunks has been reached.
- */
- public boolean isEndOfStreamReached() {
- return endOfStreamReached;
- }
-
- /**
- * Returns the last-encountered key.
- */
- public byte[] getLastKey() {
- ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
- lastTuple.getKey(keyPtr);
- return keyPtr.get();
- }
-
- private boolean rowKeyChanged(Tuple lastTuple, Tuple newTuple) {
- ImmutableBytesWritable oldKeyPtr = new ImmutableBytesWritable();
- ImmutableBytesWritable newKeyPtr = new ImmutableBytesWritable();
- lastTuple.getKey(oldKeyPtr);
- newTuple.getKey(newKeyPtr);
+ private boolean rowKeyChanged(Tuple newTuple) {
+ byte[] currentKey = lastKey.get();
+ int offset = lastKey.getOffset();
+ int length = lastKey.getLength();
+ newTuple.getKey(lastKey);
- return oldKeyPtr.compareTo(newKeyPtr) != 0;
+ return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6d147622/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 99bd7ef..04f31e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -100,10 +100,11 @@ public class QueryServicesOptions {
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 1024 * 1; // 1 Mb
public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 5;
public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
- // We make the default chunk size one row smaller than the default scan cache size because
- // one extra row is typically read and discarded by the ChunkedResultIterator, and we don't
- // want to fill up a whole new cache to read a single extra record
- public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = DEFAULT_SCAN_CACHE_SIZE - 1L;
+ // Only the first chunked batches are fetched in parallel, so this default
+ // should be on the relatively bigger side of things. Bigger means more
+ // latency and client-side spooling/buffering. Smaller means less initial
+ // latency and less parallelization.
+ public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999;
//
// Spillable GroupBy - SPGBY prefix