You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2016/05/04 13:20:58 UTC
[1/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating
through results handled correctly(Rajeshbabu)
Repository: phoenix
Updated Branches:
refs/heads/master 99713a61c -> d700c1f03
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 696f051..f9e9913 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -20,6 +20,9 @@ package org.apache.phoenix.util;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
import java.io.IOException;
import java.sql.SQLException;
@@ -32,6 +35,7 @@ import java.util.NavigableSet;
import java.util.TreeMap;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@@ -103,6 +107,10 @@ public class ScanUtil {
return scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX) != null;
}
+ public static boolean isNonAggregateScan(Scan scan) {
+ return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+ }
+
// Use getTenantId and pass in column name to match against
// in as PSchema attribute. If column name matches in
// KeyExpressions, set on scan as attribute
@@ -618,6 +626,62 @@ public class ScanUtil {
}
}
+ /**
+ * prefix region start key to the start row/stop row suffix and set as scan boundaries.
+ * @param scan
+ * @param lowerInclusiveRegionKey
+ * @param upperExclusiveRegionKey
+ */
+ public static void setupLocalIndexScan(Scan scan, byte[] lowerInclusiveRegionKey,
+ byte[] upperExclusiveRegionKey) {
+ byte[] prefix = lowerInclusiveRegionKey.length == 0 ? new byte[upperExclusiveRegionKey.length]: lowerInclusiveRegionKey;
+ int prefixLength = lowerInclusiveRegionKey.length == 0? upperExclusiveRegionKey.length: lowerInclusiveRegionKey.length;
+ if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+ scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength));
+ }
+ if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+ scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength));
+ }
+ }
+
+ public static byte[] getActualStartRow(Scan localIndexScan, HRegionInfo regionInfo) {
+ return localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? localIndexScan
+ .getStartRow() : ScanRanges.prefixKey(localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX), 0 ,
+ regionInfo.getStartKey().length == 0 ? new byte[regionInfo.getEndKey().length]
+ : regionInfo.getStartKey(),
+ regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
+ .getStartKey().length);
+ }
+
+ /**
+ * Set all attributes required and boundaries for local index scan.
+ * @param keyOffset
+ * @param regionStartKey
+ * @param regionEndKey
+ * @param newScan
+ */
+ public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) {
+ if(ScanUtil.isLocalIndex(newScan)) {
+ newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
+ newScan.setStartRow(regionStartKey);
+ newScan.setStopRow(regionEndKey);
+ if (keyOffset > 0 ) {
+ newScan.setAttribute(SCAN_START_ROW_SUFFIX, ScanRanges.stripPrefix(startRowSuffix, keyOffset));
+ } else {
+ newScan.setAttribute(SCAN_START_ROW_SUFFIX, startRowSuffix);
+ }
+ if (keyOffset > 0) {
+ newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, ScanRanges.stripPrefix(stopRowSuffix, keyOffset));
+ } else {
+ newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, stopRowSuffix);
+ }
+ }
+ }
+
+ public static boolean isConextScan(Scan scan, StatementContext context) {
+ return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0 && Bytes
+ .compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0;
+ }
public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) {
return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index d61e9fe..c021b2c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -752,7 +752,7 @@ public abstract class BaseTest {
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
LoadBalancer.class);
conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class,
- RegionServerObserver.class);
+ RegionServerObserver.class) ;
conf.setInt("dfs.namenode.handler.count", 2);
conf.setInt("dfs.namenode.service.handler.count", 2);
conf.setInt("dfs.datanode.handler.count", 2);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 47101b2..1da68ba 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -386,7 +386,12 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
return ResultIterator.EMPTY_ITERATOR;
}
-
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return ResultIterator.EMPTY_ITERATOR;
+ }
+
@Override
public ResultIterator iterator() throws SQLException {
return ResultIterator.EMPTY_ITERATOR;
@@ -467,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
return false;
}
- }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+ }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan());
List<KeyRange> keyRanges = parallelIterators.getSplits();
return keyRanges;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index f245840..5cdf234 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.hive.PhoenixRowKey;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -115,8 +116,8 @@ public class PhoenixRecordReader<T extends DBWritable> implements
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
.toBytes(true));
final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan
- .getContext().getConnection().getMutationState(), queryPlan.getTableRef(), scan,
- readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+ .getContext().getConnection().getMutationState(), scan,
+ readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance() );
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap
(tableResultIterator);
[2/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating
through results handled correctly(Rajeshbabu)
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 8848efc..4c6b960 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
+import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -85,7 +87,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema";
public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin";
public static final String VIEW_CONSTANTS = "_ViewConstants";
- public static final String STARTKEY_OFFSET = "_StartKeyOffset";
public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
public static final String REVERSE_SCAN = "_ReverseScan";
public static final String ANALYZE_TABLE = "_ANALYZETABLE";
@@ -100,6 +101,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
public final static String SCAN_OFFSET = "_RowOffset";
+ public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
+ public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -135,7 +138,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
byte[] upperExclusiveRegionKey = region.getRegionInfo().getEndKey();
boolean isStaleRegionBoundaries;
if (isLocalIndex) {
- byte[] expectedUpperRegionKey = scan.getAttribute(EXPECTED_UPPER_REGION_KEY);
+ byte[] expectedUpperRegionKey =
+ scan.getAttribute(EXPECTED_UPPER_REGION_KEY) == null ? scan.getStopRow() : scan
+ .getAttribute(EXPECTED_UPPER_REGION_KEY);
isStaleRegionBoundaries = expectedUpperRegionKey != null &&
Bytes.compareTo(upperExclusiveRegionKey, expectedUpperRegionKey) != 0;
} else {
@@ -147,6 +152,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString());
throw new DoNotRetryIOException(cause.getMessage(), cause);
}
+ if(isLocalIndex) {
+ ScanUtil.setupLocalIndexScan(scan, lowerInclusiveRegionKey, upperExclusiveRegionKey);
+ }
}
abstract protected boolean isRegionObserverFor(Scan scan);
@@ -166,7 +174,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
}
if (isRegionObserverFor(scan)) {
- if (! skipRegionBoundaryCheck(scan)) {
+ // For local indexes, we need to throw if out of region as we'll get inconsistent
+ // results otherwise while in other cases, it may just mean out client-side data
+ // on region boundaries is out of date and can safely be ignored.
+ if (!skipRegionBoundaryCheck(scan) || ScanUtil.isLocalIndex(scan)) {
throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
}
// Muck with the start/stop row of the scan and set as reversed at the
@@ -227,6 +238,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
}
} catch (Throwable t) {
+ // If the exception is NotServingRegionException then throw it as
+ // StaleRegionBoundaryCacheException to handle it by phoenix client other wise hbase
+ // client may recreate scans with wrong region boundaries.
+ if(t instanceof NotServingRegionException) {
+ Exception cause = new StaleRegionBoundaryCacheException(c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ throw new DoNotRetryIOException(cause.getMessage(), cause);
+ }
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), t);
return null; // impossible
}
@@ -280,6 +298,31 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
final ImmutableBytesWritable ptr) {
return new RegionScanner() {
+ private boolean hasReferences = checkForReferenceFiles();
+ private HRegionInfo regionInfo = c.getEnvironment().getRegionInfo();
+ private byte[] actualStartKey = getActualStartKey();
+
+ // If there are any reference files after local index region merge some cases we might
+ // get the records less than scan start row key. This will happen when we replace the
+ // actual region start key with merge region start key. This method gives whether are
+ // there any reference files in the region or not.
+ private boolean checkForReferenceFiles() {
+ if(!ScanUtil.isLocalIndex(scan)) return false;
+ for (byte[] family : scan.getFamilies()) {
+ if (c.getEnvironment().getRegion().getStore(family).hasReferences()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Get the actual scan start row of local index. This will be used to compare the row
+ // key of the results less than scan start row when there are references.
+ public byte[] getActualStartKey() {
+ return ScanUtil.isLocalIndex(scan) ? ScanUtil.getActualStartRow(scan, regionInfo)
+ : null;
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
try {
@@ -338,6 +381,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
arrayElementCell = result.get(arrayElementCellPosition);
}
if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) {
+ if(hasReferences && actualStartKey!=null) {
+ next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+ null, arrayElementCell);
+ if (result.isEmpty()) {
+ return next;
+ }
+ }
IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
}
@@ -370,6 +420,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
arrayElementCell = result.get(arrayElementCellPosition);
}
if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) {
+ if(hasReferences && actualStartKey!=null) {
+ next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result,
+ scannerContext, arrayElementCell);
+ if (result.isEmpty()) {
+ return next;
+ }
+ }
IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
}
@@ -388,6 +445,37 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
}
}
+ /**
+ * When there is a merge in progress while scanning local indexes we might get the key values less than scan start row.
+ * In that case we need to scan until get the row key more or equal to scan start key.
+ * TODO try to fix this case in LocalIndexStoreFileScanner when there is a merge.
+ */
+ private boolean scanTillScanStartRow(final RegionScanner s,
+ final Set<KeyValueColumnExpression> arrayKVRefs,
+ final Expression[] arrayFuncRefs, List<Cell> result,
+ ScannerContext scannerContext, Cell arrayElementCell) throws IOException {
+ boolean next = true;
+ Cell firstCell = result.get(0);
+ while (Bytes.compareTo(firstCell.getRowArray(), firstCell.getRowOffset(),
+ firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0) {
+ result.clear();
+ if(scannerContext == null) {
+ next = s.nextRaw(result);
+ } else {
+ next = s.nextRaw(result, scannerContext);
+ }
+ if (result.isEmpty()) {
+ return next;
+ }
+ if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+ int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+ arrayElementCell = result.get(arrayElementCellPosition);
+ }
+ firstCell = result.get(0);
+ }
+ return next;
+ }
+
private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
final Expression[] arrayFuncRefs, List<Cell> result) {
// make a copy of the results array here, as we're modifying it below
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 3237882..3b8efc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -39,7 +39,6 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -64,6 +63,7 @@ import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.Closeables;
@@ -402,8 +402,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
Region region = c.getEnvironment().getRegion();
- region.startRegionOperation();
+ boolean acquiredLock = false;
try {
+ region.startRegionOperation();
+ acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> results = new ArrayList<Cell>();
@@ -423,8 +425,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
}
} while (hasMore && groupByCache.size() < limit);
}
- } finally {
- region.closeRegionOperation();
+ } finally {
+ if (acquiredLock) region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(scanner);
@@ -472,8 +474,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
// start of a new row. Otherwise, we have to wait until an agg
int countOffset = rowAggregators.length == 0 ? 1 : 0;
Region region = c.getEnvironment().getRegion();
- region.startRegionOperation();
+ boolean acquiredLock = false;
try {
+ region.startRegionOperation();
+ acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> kvs = new ArrayList<Cell>();
@@ -505,7 +509,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
} while (hasMore && !aggBoundary && !atLimit);
}
} finally {
- region.closeRegionOperation();
+ if (acquiredLock) region.closeRegionOperation();
}
if (currentKey != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 77b8b3e..8616b6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -288,8 +289,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
long rowCount = 0;
final RegionScanner innerScanner = theScanner;
- region.startRegionOperation();
+ boolean acquiredLock = false;
try {
+ region.startRegionOperation();
+ acquiredLock = true;
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
@@ -529,7 +532,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
try {
innerScanner.close();
} finally {
- region.closeRegionOperation();
+ if (acquiredLock) region.closeRegionOperation();
}
}
if (logger.isDebugEnabled()) {
@@ -608,7 +611,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
InternalScanner internalScanner = scanner;
if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
try {
- Pair<HRegionInfo, HRegionInfo> mergeRegions = null;
long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector(
c.getEnvironment(), table.getNameAsString(), clientTimeStamp,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index b60cd7e..94d1fc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -126,7 +127,7 @@ public class AggregatePlan extends BaseQueryPlan {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
Expression expression = RowKeyExpression.INSTANCE;
OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -143,9 +144,9 @@ public class AggregatePlan extends BaseQueryPlan {
this.outerFactory = outerFactory;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
- PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName);
- return outerFactory.newIterator(context, iterator, scan, tableName);
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
+ PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName, plan);
+ return outerFactory.newIterator(context, iterator, scan, tableName, plan);
}
}
@@ -169,12 +170,12 @@ public class AggregatePlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
if (groupBy.isEmpty()) {
- UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
+ UngroupedAggregateRegionObserver.serializeIntoScan(scan);
} else {
// Set attribute with serialized expressions for coprocessor
- GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), groupBy.getScanAttribName(), groupBy.getKeyExpressions());
+ GroupedAggregateRegionObserver.serializeIntoScan(scan, groupBy.getScanAttribName(), groupBy.getKeyExpressions());
if (limit != null && orderBy.getOrderByExpressions().isEmpty() && having == null
&& ( ( statement.isDistinct() && ! statement.isAggregate() )
|| ( ! statement.isDistinct() && ( context.getAggregationManager().isEmpty()
@@ -200,7 +201,7 @@ public class AggregatePlan extends BaseQueryPlan {
* order, so we can early exit, even when aggregate functions are used, as
* the rows in the group are contiguous.
*/
- context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT,
+ scan.setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT,
PInteger.INSTANCE.toBytes(limit + (offset == null ? 0 : offset)));
}
}
@@ -211,8 +212,8 @@ public class AggregatePlan extends BaseQueryPlan {
logger.warn("This query cannot be executed serially. Ignoring the hint");
}
BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially
- ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper)
- : new ParallelIterators(this, null, wrapParallelIteratorFactory());
+ ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
+ : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan);
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index cedd23e..83e55ee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -203,26 +203,30 @@ public abstract class BaseQueryPlan implements QueryPlan {
@Override
public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
- return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+ return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, this.context.getScan());
}
-
+
+ @Override
+ public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan);
+ }
+
@Override
public final ResultIterator iterator() throws SQLException {
- return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance());
+ return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), this.context.getScan());
}
- public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
+ public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
if (context.getScanRanges() == ScanRanges.NOTHING) {
return ResultIterator.EMPTY_ITERATOR;
}
if (tableRef == TableRef.EMPTY_TABLE_REF) {
- return newIterator(scanGrouper);
+ return newIterator(scanGrouper, scan);
}
// Set miscellaneous scan attributes. This is the last chance to set them before we
// clone the scan for each parallelized chunk.
- Scan scan = context.getScan();
TableRef tableRef = context.getCurrentTable();
PTable table = tableRef.getTable();
@@ -319,7 +323,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
}
- ResultIterator iterator = newIterator(scanGrouper);
+ ResultIterator iterator = newIterator(scanGrouper, scan);
iterator = dependencies.isEmpty() ?
iterator : new DelegateResultIterator(iterator) {
@Override
@@ -448,7 +452,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
}
}
- abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException;
+ abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
@Override
public long getEstimatedSize() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index f4e374e..eb048f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -83,6 +84,11 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator(scanGrouper, null);
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
ResultIterator iterator = delegate.iterator(scanGrouper);
if (where != null) {
iterator = new FilterResultIterator(iterator, where);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 003c995..4e43225 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
@@ -50,7 +51,11 @@ public class ClientScanPlan extends ClientProcessingPlan {
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
- ResultIterator iterator = delegate.iterator(scanGrouper);
+ return iterator(scanGrouper, delegate.getContext().getScan());
+ }
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ ResultIterator iterator = delegate.iterator(scanGrouper, scan);
if (where != null) {
iterator = new FilterResultIterator(iterator, where);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 1b0af8c..fc5a04d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
@@ -103,7 +104,12 @@ public class CorrelatePlan extends DelegateQueryPlan {
}
@Override
- public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+ throws SQLException {
+ return iterator(scanGrouper, null);
+ }
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan)
throws SQLException {
return new ResultIterator() {
private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 36b725e..5887ff3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -52,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 5fdec46..d332f68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -140,6 +140,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator(scanGrouper, this.delegate.getContext().getScan());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
int count = subPlans.length;
PhoenixConnection connection = getContext().getConnection();
ConnectionQueryServices services = connection.getQueryServices();
@@ -216,11 +221,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
}
if (joinInfo != null) {
- Scan scan = delegate.getContext().getScan();
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
}
- ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper);
+ ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper, scan) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper, scan);
if (statement.getInnerSelectStatement() != null && postFilter != null) {
iterator = new FilterResultIterator(iterator, postFilter);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index fe767d9..db99964 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -71,7 +71,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan)
throws SQLException {
ResultIterator scanner = new ResultIterator() {
private final Iterator<Tuple> tupleIterator = tuples.iterator();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 980db52..9fbdb3a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -200,9 +200,8 @@ public class ScanPlan extends BaseQueryPlan {
}
@Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
// Set any scan attributes before creating the scanner, as it will be too late afterwards
- Scan scan = context.getScan();
scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
ResultIterator scanner;
TableRef tableRef = this.getTableRef();
@@ -220,11 +219,11 @@ public class ScanPlan extends BaseQueryPlan {
BaseResultIterators iterators;
boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType());
if (isOffsetOnServer) {
- iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper);
+ iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
} else if (isSerial) {
- iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper);
+ iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan);
} else {
- iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper);
+ iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan);
}
splits = iterators.getSplits();
scans = iterators.getScans();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e181e80..8e0e6e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -131,7 +131,12 @@ public class SortMergeJoinPlan implements QueryPlan {
}
@Override
- public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return iterator(scanGrouper, null);
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
return type == JoinType.Semi || type == JoinType.Anti ?
new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) :
new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index e8d9af0..0ba0cc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.expression.Expression;
@@ -60,7 +61,12 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
- ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) {
+ return iterator(scanGrouper, delegate.getContext().getScan());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper, scan)) {
@Override
public Tuple next() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 808141e..cf95b5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -139,7 +139,11 @@ public class UnionPlan implements QueryPlan {
public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
return iterator(Collections.<SQLCloseable>emptyList());
}
-
+
+ @Override
+ public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return iterator(Collections.<SQLCloseable>emptyList());
+ }
@Override
public final ResultIterator iterator() throws SQLException {
return iterator(Collections.<SQLCloseable>emptyList());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 8905eef..94c59df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
@@ -53,7 +54,12 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
@Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
- return new UnnestArrayResultIterator(delegate.iterator(scanGrouper));
+ return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, delegate.getContext().getScan()));
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, scan));
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 043bd30..bee2201 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,8 +17,10 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
@@ -116,15 +118,16 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
private final List<KeyRange> splits;
private final PTableStats tableStats;
private final byte[] physicalTableName;
- private final QueryPlan plan;
+ protected final QueryPlan plan;
protected final String scanId;
protected final MutationState mutationState;
- private final ParallelScanGrouper scanGrouper;
+ protected final ParallelScanGrouper scanGrouper;
// TODO: too much nesting here - breakup into new classes.
private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
private Long estimatedRows;
private Long estimatedSize;
private boolean hasGuidePosts;
+ private Scan scan;
static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
@Override
@@ -138,7 +141,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
private boolean useStats() {
- Scan scan = context.getScan();
boolean isPointLookup = context.getScanRanges().isPointLookup();
/*
* Don't use guide posts if:
@@ -153,11 +155,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
return true;
}
- private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset) {
+ private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) {
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
- Scan scan = context.getScan();
Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
// Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys
@@ -331,10 +332,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
}
- public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper) throws SQLException {
+ public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(),
plan.getStatement().getHint(), plan.getLimit(), plan instanceof ScanPlan ? plan.getOffset() : null);
this.plan = plan;
+ this.scan = scan;
this.scanGrouper = scanGrouper;
StatementContext context = plan.getContext();
// Clone MutationState as the one on the connection will change if auto commit is on
@@ -347,7 +349,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Used to tie all the scans together during logging
scanId = UUID.randomUUID().toString();
- initializeScan(plan, perScanLimit, offset);
+ initializeScan(plan, perScanLimit, offset, scan);
this.scans = getParallelScans();
List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
@@ -471,10 +473,69 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
private List<List<Scan>> getParallelScans() throws SQLException {
+ // If the scan boundaries are not matching with scan in context that means we need to get
+ // parallel scans for the chunk after split/merge.
+ if (!ScanUtil.isConextScan(scan, context)) {
+ return getParallelScans(scan);
+ }
return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
}
/**
+ * Get parallel scans of the specified scan boundaries. This can be used for getting parallel
+ * scans when there is split/merges while scanning a chunk. In this case we need not go by all
+ * the regions or guideposts.
+ * @param scan
+ * @return
+ * @throws SQLException
+ */
+ private List<List<Scan>> getParallelScans(Scan scan) throws SQLException {
+ List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+ .getAllTableRegions(physicalTableName);
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ int regionIndex = 0;
+ int stopIndex = regionBoundaries.size();
+ if (scan.getStartRow().length > 0) {
+ regionIndex = getIndexContainingInclusive(regionBoundaries, scan.getStartRow());
+ }
+ if (scan.getStopRow().length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow()));
+ }
+ List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+ List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
+ while (regionIndex <= stopIndex) {
+ HRegionLocation regionLocation = regionLocations.get(regionIndex);
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
+ Scan newScan = ScanUtil.newScan(scan);
+ byte[] endKey;
+ if (regionIndex == stopIndex) {
+ endKey = scan.getStopRow();
+ } else {
+ endKey = regionBoundaries.get(regionIndex);
+ }
+ if(ScanUtil.isLocalIndex(scan)) {
+ ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(),
+ regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX),
+ newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
+ } else {
+ if(Bytes.compareTo(scan.getStartRow(), regionInfo.getStartKey())<=0) {
+ newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionInfo.getStartKey());
+ newScan.setStartRow(regionInfo.getStartKey());
+ }
+ if(scan.getStopRow().length == 0 || (regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), regionInfo.getEndKey())>0)) {
+ newScan.setStopRow(regionInfo.getEndKey());
+ }
+ }
+ scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
+ regionIndex++;
+ }
+ if (!scans.isEmpty()) { // Add any remaining scans
+ parallelScans.add(scans);
+ }
+ return parallelScans;
+ }
+
+ /**
* Compute the list of parallel scans to run for a given query. The inner scans
* may be concatenated together directly, while the other ones may need to be
* merge sorted, depending on the query.
@@ -482,7 +543,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
* @throws SQLException
*/
private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException {
- Scan scan = context.getScan();
List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
.getAllTableRegions(physicalTableName);
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
@@ -555,6 +615,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
// Merge bisect with guideposts for all but the last region
while (regionIndex <= stopIndex) {
+ HRegionLocation regionLocation = regionLocations.get(regionIndex);
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
if (regionIndex == stopIndex) {
@@ -562,9 +624,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
} else {
endKey = regionBoundaries.get(regionIndex);
}
- HRegionLocation regionLocation = regionLocations.get(regionIndex);
if (isLocalIndex) {
- HRegionInfo regionInfo = regionLocation.getRegionInfo();
endRegionKey = regionInfo.getEndKey();
keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
}
@@ -572,6 +632,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
false);
+ if(newScan != null) {
+ ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
+ regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
+ }
if (newScan != null) {
estimatedRows += gps.getRowCounts().get(guideIndex);
estimatedSize += gps.getByteCounts().get(guideIndex);
@@ -584,12 +648,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
}
} catch (EOFException e) {}
Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true);
- if (isLocalIndex) {
- if (newScan != null) {
- newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
- } else if (!scans.isEmpty()) {
- scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
- }
+ if(newScan != null) {
+ ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
+ regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
}
scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation);
currentKeyBytes = endKey;
@@ -628,7 +689,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
*/
@Override
public List<PeekingResultIterator> getIterators() throws SQLException {
- Scan scan = context.getScan();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " + this,
ScanUtil.getCustomAnnotations(scan)));
@@ -676,7 +736,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
SQLException toThrow = null;
int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
try {
- submitWork(scan, futures, allIterators, splitSize, isReverse);
+ submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
@@ -687,15 +747,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException();
}
if (isLocalIndex && previousScan != null && previousScan.getScan() != null
- && ((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
+ && (((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) < 0)
|| (isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) > 0)
- || (scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
- && previousScan.getScan().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
- && Bytes.compareTo(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY),
- previousScan.getScan()
- .getAttribute(EXPECTED_UPPER_REGION_KEY)) == 0))) {
+ || (Bytes.compareTo(scanPair.getFirst().getStopRow(), previousScan.getScan().getStopRow()) == 0))
+ && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
continue;
}
PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
@@ -714,9 +771,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
Scan oldScan = scanPair.getFirst();
byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
byte[] endKey = oldScan.getStopRow();
- if (isLocalIndex) {
- endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
- }
List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey);
// Add any concatIterators that were successful so far
@@ -868,7 +922,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
abstract protected String getName();
abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException;
+ Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException;
@Override
public int size() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 7a830de..a12d40c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.sql.SQLException;
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.query.QueryServices;
@@ -62,6 +63,7 @@ public class ChunkedResultIterator implements PeekingResultIterator {
private final MutationState mutationState;
private Scan scan;
private PeekingResultIterator resultIterator;
+ private QueryPlan plan;
/**
* Chunking is deprecated and shouldn't be used while implementing new features. As of HBase 0.98.17,
@@ -84,30 +86,31 @@ public class ChunkedResultIterator implements PeekingResultIterator {
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan,
mutationState.getConnection().getQueryServices().getProps().getLong(
QueryServices.SCAN_RESULT_CHUNK_SIZE,
- QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
+ QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner, plan);
}
}
private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState,
- StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException {
+ StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
this.delegateIteratorFactory = delegateIteratorFactory;
this.context = context;
this.tableRef = tableRef;
this.scan = scan;
this.chunkSize = chunkSize;
this.mutationState = mutationState;
+ this.plan = plan;
// Instantiate single chunk iterator and the delegate iterator in constructor
// to get parallel scans kicked off in separate threads. If we delay this,
// we'll get serialized behavior (see PHOENIX-
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize);
String tableName = tableRef.getTable().getPhysicalName().getString();
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
@Override
@@ -134,13 +137,20 @@ public class ChunkedResultIterator implements PeekingResultIterator {
if (resultIterator.peek() == null && lastKey != null) {
resultIterator.close();
scan = ScanUtil.newScan(scan);
- scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+ if(ScanUtil.isLocalIndex(scan)) {
+ scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey));
+ } else {
+ scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+ }
if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan)));
String tableName = tableRef.getTable().getPhysicalName().getString();
long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
- ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(
- new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold), chunkSize);
- resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName);
+ ResultIterator singleChunkResultIterator =
+ new SingleChunkResultIterator(new TableResultIterator(mutationState, scan,
+ context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
+ renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()),
+ chunkSize);
+ resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan);
}
return resultIterator;
}
@@ -174,9 +184,6 @@ public class ChunkedResultIterator implements PeekingResultIterator {
// be able to start the next chunk on the next row key
if (rowCount == chunkSize) {
next.getKey(lastKey);
- if (scan.getAttribute(STARTKEY_OFFSET) != null) {
- addRegionStartKeyToLaskKey();
- }
} else if (rowCount > chunkSize && rowKeyChanged(next)) {
chunkComplete = true;
return null;
@@ -203,29 +210,10 @@ public class ChunkedResultIterator implements PeekingResultIterator {
int offset = lastKey.getOffset();
int length = lastKey.getLength();
newTuple.getKey(lastKey);
- if (scan.getAttribute(STARTKEY_OFFSET) != null) {
- addRegionStartKeyToLaskKey();
- }
return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0;
}
- /**
- * Prefix region start key to last key to form actual row key in case of local index scan.
- */
- private void addRegionStartKeyToLaskKey() {
- byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET);
- if (offsetBytes != null) {
- int startKeyOffset = Bytes.toInt(offsetBytes);
- byte[] actualLastkey =
- new byte[startKeyOffset + lastKey.getLength() - lastKey.getOffset()];
- System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, startKeyOffset);
- System.arraycopy(lastKey.get(), lastKey.getOffset(), actualLastkey,
- startKeyOffset, lastKey.getLength());
- lastKey.set(actualLastkey);
- }
- }
-
@Override
public String toString() {
return "SingleChunkResultIterator [rowCount=" + rowCount
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 2258caf..b720b56 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.TableRef;
@@ -28,8 +29,8 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
- CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
- return new TableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index f25e373..dbe9910 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
public interface ParallelIteratorFactory {
public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() {
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName)
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan)
throws SQLException {
return LookAheadResultIterator.wrap(scanner);
}
};
- PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException;
+ PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index a5664c7..d038f77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,20 +54,20 @@ public class ParallelIterators extends BaseResultIterators {
private static final String NAME = "PARALLEL";
private final ParallelIteratorFactory iteratorFactory;
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
throws SQLException {
- super(plan, perScanLimit, null, scanGrouper);
+ super(plan, perScanLimit, null, scanGrouper, scan);
this.iteratorFactory = iteratorFactory;
}
- public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory)
+ public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan)
throws SQLException {
- this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance());
+ this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan);
}
@Override
protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) throws SQLException {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException {
// Pre-populate nestedFutures lists so that we can shuffle the scans
// and add the future to the right nested list. By shuffling the scans
// we get better utilization of the cluster since our thread executor
@@ -98,7 +98,7 @@ public class ParallelIterators extends BaseResultIterators {
final Scan scan = scanLocation.getScan();
final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
- final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
@@ -109,7 +109,7 @@ public class ParallelIterators extends BaseResultIterators {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan)));
}
- PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName);
+ PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan);
// Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed.
iterator.peek();
allIterators.add(iterator);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index d2c89b9..0d0c5d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -59,9 +59,9 @@ public class SerialIterators extends BaseResultIterators {
private final Integer offset;
public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
- ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+ ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
throws SQLException {
- super(plan, perScanLimit, offset, scanGrouper);
+ super(plan, perScanLimit, offset, scanGrouper, scan);
this.offset = offset;
// must be a offset or a limit specified or a SERIAL hint
Preconditions.checkArgument(
@@ -71,7 +71,7 @@ public class SerialIterators extends BaseResultIterators {
@Override
protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
- final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse) {
+ final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) {
ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
final String tableName = tableRef.getTable().getPhysicalName().getString();
final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
@@ -165,8 +165,8 @@ public class SerialIterators extends BaseResultIterators {
if (remainingOffset != null) {
currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset));
}
- TableResultIterator itr = new TableResultIterator(mutationState, tableRef, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold);
- PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName);
+ TableResultIterator itr = new TableResultIterator(mutationState, currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, plan, scanGrouper);
+ PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
Tuple tuple;
if ((tuple = peekingItr.peek()) == null) {
peekingItr.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 4f85a5f..3136ca8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -36,6 +36,7 @@ import org.apache.commons.io.output.DeferredFileOutputStream;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
@@ -84,7 +85,7 @@ public class SpoolingResultIterator implements PeekingResultIterator {
this.services = services;
}
@Override
- public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException {
+ public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException {
ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName);
MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index a7e3068..3fe6098 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.iterate;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
@@ -32,12 +34,17 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.annotations.VisibleForTesting;
@@ -57,6 +64,10 @@ public class TableResultIterator implements ResultIterator {
private final CombinableMetric scanMetrics;
private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR;
private final long renewLeaseThreshold;
+ private final QueryPlan plan;
+ private final ParallelScanGrouper scanGrouper;
+ private Tuple lastTuple = null;
+ private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@GuardedBy("this")
private ResultIterator scanIterator;
@@ -73,20 +84,24 @@ public class TableResultIterator implements ResultIterator {
this.renewLeaseThreshold = 0;
this.htable = null;
this.scan = null;
+ this.plan = null;
+ this.scanGrouper = null;
}
public static enum RenewLeaseStatus {
RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
};
- public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics,
- long renewLeaseThreshold) throws SQLException {
+ public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics,
+ long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
this.scan = scan;
this.scanMetrics = scanMetrics;
- PTable table = tableRef.getTable();
+ this.plan = plan;
+ PTable table = plan.getTableRef().getTable();
htable = mutationState.getHTable(table);
this.scanIterator = UNINITIALIZED_SCANNER;
this.renewLeaseThreshold = renewLeaseThreshold;
+ this.scanGrouper = scanGrouper;
}
@Override
@@ -107,8 +122,42 @@ public class TableResultIterator implements ResultIterator {
@Override
public synchronized Tuple next() throws SQLException {
initScanner();
- Tuple t = scanIterator.next();
- return t;
+ try {
+ lastTuple = scanIterator.next();
+ if (lastTuple != null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ lastTuple.getKey(ptr);
+ }
+ } catch (SQLException e) {
+ try {
+ throw ServerUtil.parseServerException(e);
+ } catch(StaleRegionBoundaryCacheException e1) {
+ if(ScanUtil.isNonAggregateScan(scan)) {
+ // For non aggregate queries if we get stale region boundary exception we can
+ // continue scanning from the next value of lasted fetched result.
+ Scan newScan = ScanUtil.newScan(scan);
+ newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
+ if(lastTuple != null) {
+ lastTuple.getKey(ptr);
+ byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ if(ScanUtil.isLocalIndex(newScan)) {
+ // If we just set scan start row suffix then server side we prepare
+ // actual scan boundaries by prefixing the region start key.
+ newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix));
+ } else {
+ newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+ }
+ }
+ plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+ this.scanIterator =
+ plan.iterator(scanGrouper, newScan);
+ lastTuple = scanIterator.next();
+ } else {
+ throw e;
+ }
+ }
+ }
+ return lastTuple;
}
public synchronized void initScanner() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 0f5ac9b..8d7b54d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -20,10 +20,11 @@ package org.apache.phoenix.iterate;
import java.sql.SQLException;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.schema.TableRef;
public interface TableResultIteratorFactory {
- public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException;
+ public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5b799a0..0c154e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -491,6 +491,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return iterator;
+ }
+
+ @Override
public long getEstimatedSize() {
return 0;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 51ac795..17d9b6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.iterate.ConcatResultIterator;
import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.iterate.PeekingResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -50,6 +51,7 @@ import org.apache.phoenix.monitoring.ReadMetricQueue;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+
import org.apache.phoenix.query.ConnectionQueryServices;
/**
@@ -120,8 +122,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
for (Scan scan : scans) {
// For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(),
- queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance());
PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
[3/3] phoenix git commit: PHOENIX-2628 Ensure split when iterating
through results handled correctly(Rajeshbabu)
Posted by ra...@apache.org.
PHOENIX-2628 Ensure split when iterating through results handled correctly(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d700c1f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d700c1f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d700c1f0
Branch: refs/heads/master
Commit: d700c1f032a0f5d119c669100648caf040233ebe
Parents: 99713a6
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed May 4 18:55:49 2016 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed May 4 18:55:49 2016 +0530
----------------------------------------------------------------------
.../org/apache/phoenix/end2end/BaseViewIT.java | 5 +
.../phoenix/end2end/index/LocalIndexIT.java | 51 +--
.../phoenix/end2end/index/MutableIndexIT.java | 239 ++++++++++-
.../DelayedTableResultIteratorFactory.java | 9 +-
.../iterate/MockParallelIteratorFactory.java | 3 +-
.../regionserver/IndexHalfStoreFileReader.java | 412 +------------------
.../IndexHalfStoreFileReaderGenerator.java | 123 +++---
.../hbase/regionserver/LocalIndexSplitter.java | 37 --
.../LocalIndexStoreFileScanner.java | 254 ++++++++++++
.../phoenix/compile/ListJarsQueryPlan.java | 4 +
.../MutatingParallelIteratorFactory.java | 2 +-
.../org/apache/phoenix/compile/QueryPlan.java | 2 +
.../org/apache/phoenix/compile/ScanRanges.java | 11 +-
.../apache/phoenix/compile/TraceQueryPlan.java | 5 +
.../coprocessor/BaseScannerRegionObserver.java | 94 ++++-
.../GroupedAggregateRegionObserver.java | 16 +-
.../UngroupedAggregateRegionObserver.java | 8 +-
.../apache/phoenix/execute/AggregatePlan.java | 21 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 20 +-
.../phoenix/execute/ClientAggregatePlan.java | 6 +
.../apache/phoenix/execute/ClientScanPlan.java | 7 +-
.../apache/phoenix/execute/CorrelatePlan.java | 8 +-
.../phoenix/execute/DegenerateQueryPlan.java | 2 +-
.../apache/phoenix/execute/HashJoinPlan.java | 8 +-
.../execute/LiteralResultIterationPlan.java | 2 +-
.../org/apache/phoenix/execute/ScanPlan.java | 9 +-
.../phoenix/execute/SortMergeJoinPlan.java | 7 +-
.../phoenix/execute/TupleProjectionPlan.java | 8 +-
.../org/apache/phoenix/execute/UnionPlan.java | 6 +-
.../apache/phoenix/execute/UnnestArrayPlan.java | 8 +-
.../phoenix/iterate/BaseResultIterators.java | 112 +++--
.../phoenix/iterate/ChunkedResultIterator.java | 50 +--
.../DefaultTableResultIteratorFactory.java | 5 +-
.../iterate/ParallelIteratorFactory.java | 5 +-
.../phoenix/iterate/ParallelIterators.java | 14 +-
.../apache/phoenix/iterate/SerialIterators.java | 10 +-
.../phoenix/iterate/SpoolingResultIterator.java | 3 +-
.../phoenix/iterate/TableResultIterator.java | 59 ++-
.../iterate/TableResultIteratorFactory.java | 3 +-
.../apache/phoenix/jdbc/PhoenixStatement.java | 5 +
.../phoenix/mapreduce/PhoenixRecordReader.java | 5 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 64 +++
.../java/org/apache/phoenix/query/BaseTest.java | 2 +-
.../query/ParallelIteratorsSplitTest.java | 9 +-
.../hive/mapreduce/PhoenixRecordReader.java | 5 +-
45 files changed, 1078 insertions(+), 660 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
index 7e7175f..65f1f93 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseViewIT.java
@@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -45,6 +46,7 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
@@ -98,6 +100,9 @@ public abstract class BaseViewIT extends BaseOwnClusterHBaseManagedTimeIT {
if (saltBuckets == null) {
try (Connection conn = DriverManager.getConnection(getUrl())) {
HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
+ if(ScanUtil.isLocalIndex(scan)) {
+ ScanUtil.setLocalIndexAttributes(scan, 0, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, scan.getStartRow(), scan.getStopRow());
+ }
ResultScanner scanner = htable.getScanner(scan);
Result result = scanner.next();
// Confirm index has rows
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 2d79f36..f7edea7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -790,15 +790,27 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
admin.getConnection(), indexTable, false);
}
assertEquals(4 + i, regionsOfIndexTable.size());
+ String[] tIdColumnValues = new String[26];
+ String[] v1ColumnValues = new String[26];
+ int[] k1ColumnValue = new int[26];
String query = "SELECT t_id,k1,v1 FROM " + tableName;
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
- assertEquals(strings[25-j], rs.getString("t_id"));
- assertEquals(25-j, rs.getInt("k1"));
- assertEquals(strings[j], rs.getString("V1"));
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
}
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(v1ColumnValues);
+ Arrays.sort(k1ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ assertTrue(Arrays.equals(strings, v1ColumnValues));
+ for(int m=0;m<26;m++) {
+ assertEquals(m, k1ColumnValue[m]);
+ }
+
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL " + (4 + i) + "-WAY RANGE SCAN OVER "
@@ -817,11 +829,20 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
+ "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
+ int[] k3ColumnValue = new int[26];
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
- assertEquals(strings[j], rs.getString("t_id"));
- assertEquals(j, rs.getInt("k1"));
- assertEquals(j+2, rs.getInt("k3"));
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ k3ColumnValue[j] = rs.getInt("k3");
+ }
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(k1ColumnValue);
+ Arrays.sort(k3ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ for(int m=0;m<26;m++) {
+ assertEquals(m, k1ColumnValue[m]);
+ assertEquals(m+2, k3ColumnValue[m]);
}
}
} finally {
@@ -1013,24 +1034,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
assertEquals(5, regionsOfIndexTable.size());
boolean success = latch1.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS);
assertTrue("Timed out waiting for MockedLocalIndexSplitter.preSplitAfterPONR to complete", success);
- // Verify the metadata for index is correct.
- rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
- rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indexName+"_2",
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName+"_2", rs.getString(3));
- assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- String query = "SELECT t_id,k1,v1 FROM " + tableName+"2";
- rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
- assertEquals("CLIENT PARALLEL " + 1 + "-WAY FULL SCAN OVER " + tableName+"2",
- QueryUtil.getExplainPlan(rs));
latch2.countDown();
} finally {
conn1.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index add282e..80f1250 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -31,14 +32,25 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import jline.internal.Log;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,8 +98,8 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
@Parameters(name="localIndex = {0} , transactional = {1}")
public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false, false }, { false, true }, { true, false }, { true, true }
+ return Arrays.asList(new Boolean[][] {
+ { false, false }, { false, true }, { true, false }, { true, true }
});
}
@@ -594,4 +606,227 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
}
}
+ @Test
+ public void testSplitDuringIndexScan() throws Exception {
+ testSplitDuringIndexScan(false);
+ }
+
+ @Test
+ public void testSplitDuringIndexReverseScan() throws Exception {
+ testSplitDuringIndexScan(true);
+ }
+
+ private void testSplitDuringIndexScan(boolean isReverse) throws Exception {
+ Properties props = new Properties();
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(2));
+ props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+ try(Connection conn1 = DriverManager.getConnection(getUrl(), props)){
+ String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ dropTable(admin, conn1);
+ createTableAndLoadData(conn1, strings, isReverse);
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+ splitDuringScan(conn1, strings, admin, isReverse);
+ dropTable(admin, conn1);
+ }
+ }
+
+ private void dropTable(HBaseAdmin admin, Connection conn) throws SQLException, IOException {
+ conn.createStatement().execute("DROP TABLE IF EXISTS "+ tableName);
+ if(admin.tableExists(tableName)) {
+ admin.disableTable(TableName.valueOf(tableName));
+ admin.deleteTable(TableName.valueOf(tableName));
+ }
+ if(admin.tableExists(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName)) {
+ admin.disableTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ admin.deleteTable(localIndex? MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ }
+ }
+
+ private void createTableAndLoadData(Connection conn1, String[] strings, boolean isReverse) throws SQLException {
+ createBaseTable(conn1, tableName, null);
+ for (int i = 0; i < 26; i++) {
+ conn1.createStatement().execute(
+ "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + ","
+ + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+ }
+ conn1.commit();
+ conn1.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)");
+ }
+
+ @Test
+ public void testIndexHalfStoreFileReader() throws Exception {
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ try {
+ dropTable(admin, conn1);
+ createBaseTable(conn1, tableName, "('e')");
+ conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')"));
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+ conn1.commit();
+
+ String query = "SELECT count(*) FROM " + tableName +" where v1<='z'";
+ ResultSet rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+
+ TableName table = TableName.valueOf(localIndex?tableName: indexName);
+ TableName indexTable = TableName.valueOf(localIndex?MetaDataUtil.getLocalIndexTableName(tableName): indexName);
+ admin.flush(indexTable);
+ boolean merged = false;
+ // merge regions until 1 left
+ end: while (true) {
+ long numRegions = 0;
+ while (true) {
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ System.out.println("Number of rows returned:" + rs.getInt(1));
+ assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results?
+ try {
+ List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable);
+ numRegions = indexRegions.size();
+ if (numRegions==1) {
+ break end;
+ }
+ if(!merged) {
+ List<HRegionInfo> regions =
+ admin.getTableRegions(localIndex ? table : indexTable);
+ System.out.println("Merging: " + regions.size());
+ admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+ regions.get(1).getEncodedNameAsBytes(), false);
+ merged = true;
+ Threads.sleep(10000);
+ }
+ break;
+ } catch (Exception ex) {
+ Log.info(ex);
+ }
+
+ long waitStartTime = System.currentTimeMillis();
+ // wait until merge happened
+ while (System.currentTimeMillis() - waitStartTime < 10000) {
+ List<HRegionInfo> regions = admin.getTableRegions(indexTable);
+ System.out.println("Waiting:" + regions.size());
+ if (regions.size() < numRegions) {
+ break;
+ }
+ Threads.sleep(1000);
+ }
+ }
+ }
+ } finally {
+ dropTable(admin, conn1);
+ }
+ }
+
+ private List<HRegionInfo> mergeRegions(HBaseAdmin admin, List<HRegionInfo> regionsOfUserTable)
+ throws IOException, InterruptedException {
+ for (int i = 2; i > 0; i--) {
+ Threads.sleep(10000);
+ admin.mergeRegions(regionsOfUserTable.get(0).getEncodedNameAsBytes(),
+ regionsOfUserTable.get(1).getEncodedNameAsBytes(), false);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), admin.getConnection(),
+ TableName.valueOf(localIndex? tableName:indexName), false);
+
+ while (regionsOfUserTable.size() != i) {
+ Thread.sleep(100);
+ regionsOfUserTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(localIndex? tableName:indexName), false);
+ }
+ assertEquals(i, regionsOfUserTable.size());
+ if(localIndex) {
+ List<HRegionInfo> regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+ while (regionsOfIndexTable.size() != i) {
+ Thread.sleep(100);
+ regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(MetaDataUtil.getLocalIndexTableName(tableName)), false);
+ }
+ assertEquals(i, regionsOfIndexTable.size());
+ }
+ }
+ return regionsOfUserTable;
+ }
+
+ private List<HRegionInfo> splitDuringScan(Connection conn1, String[] strings, HBaseAdmin admin, boolean isReverse)
+ throws SQLException, IOException, InterruptedException {
+ ResultSet rs;
+ String query = "SELECT t_id,k1,v1 FROM " + tableName;
+ rs = conn1.createStatement().executeQuery(query);
+ String[] tIdColumnValues = new String[26];
+ String[] v1ColumnValues = new String[26];
+ int[] k1ColumnValue = new int[26];
+ for (int j = 0; j < 5; j++) {
+ assertTrue(rs.next());
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
+ }
+
+ String[] splitKeys = new String[2];
+ splitKeys[0] = strings[4];
+ splitKeys[1] = strings[12];
+
+ int[] splitInts = new int[2];
+ splitInts[0] = 22;
+ splitInts[1] = 4;
+ List<HRegionInfo> regionsOfUserTable = null;
+ for(int i = 0; i <=1; i++) {
+ Threads.sleep(10000);
+ if(localIndex) {
+ admin.split(Bytes.toBytes(tableName),
+ ByteUtil.concat(Bytes.toBytes(splitKeys[i])));
+ } else {
+ admin.split(Bytes.toBytes(indexName), ByteUtil.concat(Bytes.toBytes(splitInts[i])));
+ }
+ Thread.sleep(100);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(), TableName.valueOf(localIndex?tableName:indexName),
+ false);
+
+ while (regionsOfUserTable.size() != (i+2)) {
+ Thread.sleep(100);
+ regionsOfUserTable =
+ MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
+ admin.getConnection(),
+ TableName.valueOf(localIndex?tableName:indexName), false);
+ }
+ assertEquals(i+2, regionsOfUserTable.size());
+ }
+ for (int j = 5; j < 26; j++) {
+ assertTrue(rs.next());
+ tIdColumnValues[j] = rs.getString("t_id");
+ k1ColumnValue[j] = rs.getInt("k1");
+ v1ColumnValues[j] = rs.getString("V1");
+ }
+ Arrays.sort(tIdColumnValues);
+ Arrays.sort(v1ColumnValues);
+ Arrays.sort(k1ColumnValue);
+ assertTrue(Arrays.equals(strings, tIdColumnValues));
+ assertTrue(Arrays.equals(strings, v1ColumnValues));
+ for(int i=0;i<26;i++) {
+ assertEquals(i, k1ColumnValue[i]);
+ }
+ assertFalse(rs.next());
+ return regionsOfUserTable;
+ }
+
+ private void createBaseTable(Connection conn, String tableName, String splits) throws SQLException {
+ String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 INTEGER NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "k3 INTEGER,\n" +
+ "v1 VARCHAR,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+ + (tableDDLOptions!=null?tableDDLOptions:"") + (splits != null ? (" split on " + splits) : "");
+ conn.createStatement().execute(ddl);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
index 74deb71..55bed91 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/DelayedTableResultIteratorFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
@@ -39,13 +40,13 @@ public class DelayedTableResultIteratorFactory implements TableResultIteratorFac
@Override
public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan,
- CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
- return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ return new DelayedTableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
}
private class DelayedTableResultIterator extends TableResultIterator {
- public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException {
- super(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold);
+ public DelayedTableResultIterator (MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+ super(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
index d8a08e6..b5c5f0f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.schema.PTable;
@@ -33,7 +34,7 @@ public class MockParallelIteratorFactory implements ParallelIteratorFactory {
@Override
public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan,
- String physicalTableName) throws SQLException {
+ String physicalTableName, QueryPlan plan) throws SQLException {
return new MockResultIterator(String.valueOf(counter.incrementAndGet()), table);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
index cbc4ed6..d1d12fb 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java
@@ -18,27 +18,18 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.index.IndexMaintainer;
/**
@@ -57,7 +48,6 @@ import org.apache.phoenix.index.IndexMaintainer;
*/
public class IndexHalfStoreFileReader extends StoreFile.Reader {
- private static final int ROW_KEY_LENGTH = 2;
private final boolean top;
// This is the key we split around. Its the first possible entry on a row:
// i.e. empty column and a timestamp of LATEST_TIMESTAMP.
@@ -73,36 +63,6 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
* @param fs
* @param p
* @param cacheConf
- * @param r
- * @param conf
- * @param indexMaintainers
- * @param viewConstants
- * @param regionInfo
- * @param regionStartKeyInHFile
- * @param splitKey
- * @throws IOException
- */
- public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
- final Reference r, final Configuration conf,
- final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
- final byte[][] viewConstants, final HRegionInfo regionInfo,
- final byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
- super(fs, p, cacheConf, conf);
- this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
- // Is it top or bottom half?
- this.top = Reference.isTopFileRegion(r.getFileRegion());
- this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
- this.indexMaintainers = indexMaintainers;
- this.viewConstants = viewConstants;
- this.regionInfo = regionInfo;
- this.regionStartKeyInHFile = regionStartKeyInHFile;
- this.offset = regionStartKeyInHFile.length;
- }
-
- /**
- * @param fs
- * @param p
- * @param cacheConf
* @param in
* @param size
* @param r
@@ -132,371 +92,35 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader {
this.offset = regionStartKeyInHFile.length;
}
- protected boolean isTop() {
- return this.top;
+ public int getOffset() {
+ return offset;
}
- @Override
- public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
- final boolean isCompaction) {
- final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
- return new HFileScanner() {
- final HFileScanner delegate = s;
- public boolean atEnd = false;
-
- @Override
- public ByteBuffer getKey() {
- if (atEnd) {
- return null;
- }
- boolean changeBottomKeys =
- regionInfo.getStartKey().length == 0 && splitRow.length != offset;
- if (!top) {
- // For first region we are prepending empty byte array of length region end key.
- // So if split row length is not equal to region end key length then we need to
- // replace empty bytes of split row length. Because after split end key is the split
- // row.
- if(!changeBottomKeys) return delegate.getKey();
- }
- // If it is top store file replace the StartKey of the Key with SplitKey
- return getChangedKey(delegate.getKeyValue(), changeBottomKeys);
- }
-
- private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) {
- // new KeyValue(row, family, qualifier, timestamp, type, value)
- byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
- KeyValue newKv =
- new KeyValue(newRowkey, 0, newRowkey.length, kv.getFamilyArray(),
- kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(),
- kv.getTimestamp(), Type.codeToType(kv.getTypeByte()), null, 0, 0);
- ByteBuffer keyBuffer = ByteBuffer.wrap(newKv.getKey());
- return keyBuffer;
- }
-
- private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
- int lenOfRemainingKey = kv.getRowLength() - offset;
- byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + splitRow.length];
- System.arraycopy(changeBottomKeys ? new byte[splitRow.length] : splitRow, 0,
- keyReplacedStartKey, 0, splitRow.length);
- System.arraycopy(kv.getRowArray(), kv.getRowOffset() + offset, keyReplacedStartKey,
- splitRow.length, lenOfRemainingKey);
- return keyReplacedStartKey;
- }
-
- @Override
- public String getKeyString() {
- if (atEnd) {
- return null;
- }
- return Bytes.toStringBinary(getKey());
- }
-
- @Override
- public ByteBuffer getValue() {
- if (atEnd) {
- return null;
- }
- return delegate.getValue();
- }
-
- @Override
- public String getValueString() {
- if (atEnd) {
- return null;
- }
- return Bytes.toStringBinary(getValue());
- }
-
- @Override
- public Cell getKeyValue() {
- if (atEnd) {
- return null;
- }
- Cell kv = delegate.getKeyValue();
- boolean changeBottomKeys =
- regionInfo.getStartKey().length == 0 && splitRow.length != offset;
- if (!top) {
- if(!changeBottomKeys) return kv;
- }
- // If it is a top store file change the StartKey with SplitKey in Key
- // and produce the new value corresponding to the change in key
- byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys);
- KeyValue changedKv =
- new KeyValue(changedKey, 0, changedKey.length, kv.getFamilyArray(),
- kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(),
- kv.getQualifierOffset(), kv.getQualifierLength(),
- kv.getTimestamp(), Type.codeToType(kv.getTypeByte()),
- kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
- kv.getTagsArray(), kv.getTagsOffset(), kv.getTagsLength());
- return changedKv;
- }
-
- @Override
- public boolean next() throws IOException {
- if (atEnd) {
- return false;
- }
- while (true) {
- boolean b = delegate.next();
- if (!b) {
- atEnd = true;
- return b;
- }
- // We need to check whether the current KV pointed by this reader is
- // corresponding to
- // this split or not.
- // In case of top store file if the ActualRowKey >= SplitKey
- // In case of bottom store file if the ActualRowKey < Splitkey
- if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
- return true;
- }
- }
- }
-
- @Override
- public boolean seekBefore(byte[] key) throws IOException {
- return seekBefore(key, 0, key.length);
- }
-
- @Override
- public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
-
- if (top) {
- byte[] fk = getFirstKey();
- // This will be null when the file is empty in which we can not seekBefore to
- // any key
- if (fk == null) {
- return false;
- }
- if (getComparator().compare(key, offset, length, fk, 0, fk.length) <= 0) {
- return false;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
- return this.delegate.seekBefore(replacedKey);
- } else {
- // The equals sign isn't strictly necessary just here to be consistent with
- // seekTo
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- return this.delegate.seekBefore(splitkey, 0, splitkey.length);
- }
- }
- return this.delegate.seekBefore(key, offset, length);
- }
-
- @Override
- public boolean seekBefore(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public boolean seekTo() throws IOException {
- boolean b = delegate.seekTo();
- if (!b) {
- atEnd = true;
- return b;
- }
- while (true) {
- // We need to check the first occurrence of satisfying the condition
- // In case of top store file if the ActualRowKey >= SplitKey
- // In case of bottom store file if the ActualRowKey < Splitkey
- if (isSatisfiedMidKeyCondition(delegate.getKeyValue())) {
- return true;
- }
- b = delegate.next();
- if (!b) {
- return b;
- }
- }
- }
-
- @Override
- public int seekTo(byte[] key) throws IOException {
- return seekTo(key, 0, key.length);
- }
-
- @Override
- public int seekTo(byte[] key, int offset, int length) throws IOException {
- if (top) {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
- return -1;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
-
- int seekTo =
- delegate.seekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
- replacedKey.getKeyLength());
- return seekTo;
- /*
- * if (seekTo == 0 || seekTo == -1) { return seekTo; } else if (seekTo == 1) {
- * boolean next = this.next(); }
- */
- } else {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- // we would place the scanner in the second half.
- // it might be an error to return false here ever...
- boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
- if (!res) {
- throw new IOException(
- "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
- }
- return 1;
- }
- }
- return delegate.seekTo(key, offset, length);
- }
-
- @Override
- public int seekTo(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public int reseekTo(byte[] key) throws IOException {
- return reseekTo(key, 0, key.length);
- }
-
- @Override
- public int reseekTo(byte[] key, int offset, int length) throws IOException {
- if (top) {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) {
- return -1;
- }
- KeyValue replacedKey = getKeyPresentInHFiles(key);
- return delegate.reseekTo(replacedKey.getBuffer(), replacedKey.getKeyOffset(),
- replacedKey.getKeyLength());
- } else {
- if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) >= 0) {
- // we would place the scanner in the second half.
- // it might be an error to return false here ever...
- boolean res = delegate.seekBefore(splitkey, 0, splitkey.length);
- if (!res) {
- throw new IOException(
- "Seeking for a key in bottom of file, but key exists in top of file, failed on seekBefore(midkey)");
- }
- return 1;
- }
- }
- return delegate.reseekTo(key, offset, length);
- }
-
- @Override
- public int reseekTo(Cell cell) throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
- }
-
- @Override
- public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
- return this.delegate.getReader();
- }
-
- // TODO: Need to change as per IndexHalfStoreFileReader
- @Override
- public boolean isSeeked() {
- return this.delegate.isSeeked();
- }
-
- // Added for compatibility with HBASE-13109
- // Once we drop support for older versions, add an @override annotation here
- // and figure out how to get the next indexed key
- public Cell getNextIndexedKey() {
- return null; // indicate that we cannot use the optimization
- }
- };
+ public byte[][] getViewConstants() {
+ return viewConstants;
}
- private boolean isSatisfiedMidKeyCondition(Cell kv) {
- if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
- // In case of a Delete type KV, let it be going to both the daughter regions.
- // No problems in doing so. In the correct daughter region where it belongs to, this delete
- // tomb will really delete a KV. In the other it will just hang around there with no actual
- // kv coming for which this is a delete tomb. :)
- return true;
- }
- ImmutableBytesWritable rowKey =
- new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + offset,
- kv.getRowLength() - offset);
- Entry<ImmutableBytesWritable, IndexMaintainer> entry = indexMaintainers.entrySet().iterator().next();
- IndexMaintainer indexMaintainer = entry.getValue();
- byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
- IndexMaintainer actualIndexMaintainer = indexMaintainers.get(new ImmutableBytesWritable(viewIndexId));
- byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, this.viewConstants);
- int compareResult = Bytes.compareTo(dataRowKey, splitRow);
- if (top) {
- if (compareResult >= 0) {
- return true;
- }
- } else {
- if (compareResult < 0) {
- return true;
- }
- }
- return false;
+ public Map<ImmutableBytesWritable, IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
}
- /**
- * In case of top half store, the passed key will be with the start key of the daughter region.
- * But in the actual HFiles, the key will be with the start key of the old parent region. In
- * order to make the real seek in the HFiles, we need to build the old key.
- *
- * The logic here is just replace daughter region start key with parent region start key
- * in the key part.
- *
- * @param key
- *
- */
- private KeyValue getKeyPresentInHFiles(byte[] key) {
- KeyValue keyValue = new KeyValue(key);
- int rowLength = keyValue.getRowLength();
- int rowOffset = keyValue.getRowOffset();
-
- int daughterStartKeyLength =
- regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo
- .getStartKey().length;
-
- // This comes incase of deletefamily
- if (top
- && 0 == keyValue.getValueLength()
- && keyValue.getTimestamp() == HConstants.LATEST_TIMESTAMP
- && Bytes.compareTo(keyValue.getRowArray(), keyValue.getRowOffset(),
- keyValue.getRowLength(), splitRow, 0, splitRow.length) == 0
- && CellUtil.isDeleteFamily(keyValue)) {
- KeyValue createFirstDeleteFamilyOnRow =
- KeyValueUtil.createFirstDeleteFamilyOnRow(regionStartKeyInHFile,
- keyValue.getFamily());
- return createFirstDeleteFamilyOnRow;
- }
+ public HRegionInfo getRegionInfo() {
+ return regionInfo;
+ }
- short length = (short) (keyValue.getRowLength() - daughterStartKeyLength + offset);
- byte[] replacedKey =
- new byte[length + key.length - (rowOffset + rowLength) + ROW_KEY_LENGTH];
- System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_KEY_LENGTH);
- System.arraycopy(regionStartKeyInHFile, 0, replacedKey, ROW_KEY_LENGTH, offset);
- System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + daughterStartKeyLength,
- replacedKey, offset + ROW_KEY_LENGTH, keyValue.getRowLength()
- - daughterStartKeyLength);
- System.arraycopy(key, rowOffset + rowLength, replacedKey,
- offset + keyValue.getRowLength() - daughterStartKeyLength
- + ROW_KEY_LENGTH, key.length - (rowOffset + rowLength));
- return KeyValue.createKeyValueFromKey(replacedKey);
+ public byte[] getRegionStartKeyInHFile() {
+ return regionStartKeyInHFile;
}
- @Override
- public byte[] midkey() throws IOException {
- // Returns null to indicate file is not splitable.
- return null;
+ public byte[] getSplitkey() {
+ return splitkey;
}
- @Override
- public byte[] getFirstKey() {
- return super.getFirstKey();
+ public byte[] getSplitRow() {
+ return splitRow;
}
- @Override
- public boolean passesKeyRangeFilter(Scan scan) {
- return true;
+ public boolean isTop() {
+ return top;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index b48314d..6cf8fa1 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,11 +53,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
@@ -67,10 +65,6 @@ import org.apache.phoenix.util.QueryUtil;
public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
- int storeFilesCount = 0;
- int compactedFilesCount = 0;
- private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
-
@Override
public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
@@ -116,7 +110,17 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
// We need not change any thing in first region data because first region start key
// is equal to merged region start key. So returning same reader.
if (Bytes.compareTo(mergeRegions.getFirst().getStartKey(), splitRow) == 0) {
- return reader;
+ if (mergeRegions.getFirst().getStartKey().length == 0
+ && region.getRegionInfo().getEndKey().length != mergeRegions
+ .getFirst().getEndKey().length) {
+ childRegion = mergeRegions.getFirst();
+ regionStartKeyInHFile =
+ mergeRegions.getFirst().getStartKey().length == 0 ? new byte[mergeRegions
+ .getFirst().getEndKey().length] : mergeRegions.getFirst()
+ .getStartKey();
+ } else {
+ return reader;
+ }
} else {
childRegion = mergeRegions.getSecond();
regionStartKeyInHFile = mergeRegions.getSecond().getStartKey();
@@ -170,58 +174,31 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
}
return reader;
}
-
+
+ @SuppressWarnings("deprecation")
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException {
- InternalScanner internalScanner = super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request);
- Collection<StoreFile> files = request.getFiles();
- storeFilesCount = 0;
- compactedFilesCount = 0;
- for(StoreFile file:files) {
- if(!file.isReference()) {
- return internalScanner;
- }
+ if (!scanType.equals(ScanType.COMPACT_DROP_DELETES) || s != null || !store.hasReferences()) {
+ return s;
}
- storeFilesCount = files.size();
- return internalScanner;
- }
-
- @Override
- public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
- StoreFile resultFile) throws IOException {
- super.postCompact(e, store, resultFile);
- if(storeFilesCount > 0) compactedFilesCount++;
- if(compactedFilesCount == storeFilesCount) {
- PhoenixConnection conn = null;
- try {
- conn = QueryUtil.getConnectionOnServer(e.getEnvironment().getConfiguration()).unwrap(
- PhoenixConnection.class);
- MetaDataClient client = new MetaDataClient(conn);
- String userTableName = MetaDataUtil.getUserTableName(e.getEnvironment().getRegion().getTableDesc().getNameAsString());
- PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
- List<PTable> indexes = dataTable.getIndexes();
- for (PTable index : indexes) {
- if (index.getIndexType() == IndexType.LOCAL) {
- AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTable.getTableName().getString(), false, PIndexState.ACTIVE);
- client.alterIndex(indexStatement);
- }
- }
- conn.commit();
- } catch (ClassNotFoundException ex) {
- } catch (SQLException ex) {
- } finally {
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException ex) {
- }
- }
+ List<StoreFileScanner> newScanners = new ArrayList<StoreFileScanner>(scanners.size());
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getFamily().getMaxVersions());
+ boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+ for(KeyValueScanner scanner: scanners) {
+ Reader reader = ((StoreFileScanner) scanner).getReader();
+ if (reader instanceof IndexHalfStoreFileReader) {
+ newScanners.add(new LocalIndexStoreFileScanner(reader, reader.getScanner(
+ scan.getCacheBlocks(), scanUsePread, false), true, reader.getHFileReader()
+ .hasMVCCInfo(), store.getSmallestReadPoint()));
+ } else {
+ newScanners.add(((StoreFileScanner) scanner));
}
}
+ return new StoreScanner(store, store.getScanInfo(), scan, newScanners,
+ scanType, store.getSmallestReadPoint(), earliestPutTs);
}
private byte[][] getViewConstants(PTable dataTable) {
@@ -254,4 +231,42 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
}
return viewConstants;
}
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ if(store.hasReferences()) {
+ long readPt = c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel());
+ boolean scanUsePread = c.getEnvironment().getConfiguration().getBoolean("hbase.storescanner.use.pread", scan.isSmall());
+ Collection<StoreFile> storeFiles = store.getStorefiles();
+ List<StoreFile> nonReferenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+ List<StoreFile> referenceStoreFiles = new ArrayList<StoreFile>(store.getStorefiles().size());
+ List<KeyValueScanner> keyValueScanners = new ArrayList<KeyValueScanner>(store.getStorefiles().size()+1);
+ for(StoreFile storeFile : storeFiles) {
+ if (storeFile.isReference()) {
+ referenceStoreFiles.add(storeFile);
+ } else {
+ nonReferenceStoreFiles.add(storeFile);
+ }
+ }
+ List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(nonReferenceStoreFiles, scan.getCacheBlocks(), scanUsePread, readPt);
+ keyValueScanners.addAll(scanners);
+ for(StoreFile sf : referenceStoreFiles) {
+ if(sf.getReader() instanceof IndexHalfStoreFileReader) {
+ keyValueScanners.add(new LocalIndexStoreFileScanner(sf.getReader(), sf.getReader()
+ .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+ .getReader().getHFileReader().hasMVCCInfo(), readPt));
+ } else {
+ keyValueScanners.add(new StoreFileScanner(sf.getReader(), sf.getReader()
+ .getScanner(scan.getCacheBlocks(), scanUsePread, false), true, sf
+ .getReader().getHFileReader().hasMVCCInfo(), readPt));
+ }
+ }
+ keyValueScanners.addAll(((HStore)store).memstore.getScanners(readPt));
+ if(!scan.isReversed()) return new StoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+ return new ReversedStoreScanner(scan, store.getScanInfo(), ScanType.USER_SCAN, targetCols, keyValueScanners);
+ }
+ return s;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
index 63cf3ba..ba158a8 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -31,23 +31,14 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.parse.AlterIndexStatement;
import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
-import java.sql.SQLException;
import java.util.List;
public class LocalIndexSplitter extends BaseRegionObserver {
@@ -144,34 +135,6 @@ public class LocalIndexSplitter extends BaseRegionObserver {
throws IOException {
if (st == null || daughterRegions == null) return;
RegionCoprocessorEnvironment environment = ctx.getEnvironment();
- PhoenixConnection conn = null;
- try {
- conn = QueryUtil.getConnectionOnServer(ctx.getEnvironment().getConfiguration()).unwrap(
- PhoenixConnection.class);
- MetaDataClient client = new MetaDataClient(conn);
- String userTableName = ctx.getEnvironment().getRegion().getTableDesc().getNameAsString();
- PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
- List<PTable> indexes = dataTable.getIndexes();
- for (PTable index : indexes) {
- if (index.getIndexType() == IndexType.LOCAL) {
- AlterIndexStatement indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- org.apache.phoenix.parse.TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTable.getTableName().getString(), false, PIndexState.INACTIVE);
- client.alterIndex(indexStatement);
- }
- }
- conn.commit();
- } catch (ClassNotFoundException ex) {
- } catch (SQLException ex) {
- } finally {
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException ex) {
- }
- }
- }
-
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
st.stepsAfterPONR(rs, rs, daughterRegions);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
new file mode 100644
index 0000000..a6e5005
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexStoreFileScanner.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.index.IndexMaintainer;
+
+import static org.apache.hadoop.hbase.KeyValue.ROW_LENGTH_SIZE;
+
+public class LocalIndexStoreFileScanner extends StoreFileScanner{
+
+ private IndexHalfStoreFileReader reader;
+ private boolean changeBottomKeys;
+ public LocalIndexStoreFileScanner(Reader reader, HFileScanner hfs, boolean useMVCC,
+ boolean hasMVCC, long readPt) {
+ super(reader, hfs, useMVCC, hasMVCC, readPt);
+ this.reader = ((IndexHalfStoreFileReader)super.getReader());
+ this.changeBottomKeys =
+ this.reader.getRegionInfo().getStartKey().length == 0
+ && this.reader.getSplitRow().length != this.reader.getOffset();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ Cell next = super.next();
+ while(next !=null && !isSatisfiedMidKeyCondition(next)) {
+ next = super.next();
+ }
+ while(super.peek() != null && !isSatisfiedMidKeyCondition(super.peek())) {
+ super.next();
+ }
+ if (next!=null && (reader.isTop() || changeBottomKeys)) {
+ next = getChangedKey(next, !reader.isTop() && changeBottomKeys);
+ }
+ return next;
+ }
+
+ @Override
+ public Cell peek() {
+ Cell peek = super.peek();
+ if (peek != null && (reader.isTop() || changeBottomKeys)) {
+ peek = getChangedKey(peek, !reader.isTop() && changeBottomKeys);
+ }
+ return peek;
+ }
+
+ private KeyValue getChangedKey(Cell next, boolean changeBottomKeys) {
+ // If it is a top store file change the StartKey with SplitKey in Key
+ //and produce the new value corresponding to the change in key
+ byte[] changedKey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(next, changeBottomKeys);
+ KeyValue changedKv =
+ new KeyValue(changedKey, 0, changedKey.length, next.getFamilyArray(),
+ next.getFamilyOffset(), next.getFamilyLength(), next.getQualifierArray(),
+ next.getQualifierOffset(), next.getQualifierLength(),
+ next.getTimestamp(), Type.codeToType(next.getTypeByte()),
+ next.getValueArray(), next.getValueOffset(), next.getValueLength(),
+ next.getTagsArray(), next.getTagsOffset(), next.getTagsLength());
+ return changedKv;
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return seekOrReseek(key, true);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return seekOrReseek(key, false);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(key);
+ if (reader.isTop()) {
+ byte[] fk = reader.getFirstKey();
+ // This will be null when the file is empty in which we can not seekBefore to
+ // any key
+ if (fk == null) {
+ return false;
+ }
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), fk, 0, fk.length) <= 0) {
+ return super.seekToPreviousRow(key);
+ }
+ KeyValue replacedKey = getKeyPresentInHFiles(kv.getBuffer());
+ boolean seekToPreviousRow = super.seekToPreviousRow(replacedKey);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ } else {
+ // The equals sign isn't strictly necessary just here to be consistent with
+ // seekTo
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+ boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ }
+ }
+ boolean seekToPreviousRow = super.seekToPreviousRow(kv);
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToPreviousRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToPreviousRow;
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ boolean seekToLastRow = super.seekToLastRow();
+ while(super.peek()!=null && !isSatisfiedMidKeyCondition(super.peek())) {
+ seekToLastRow = super.seekToPreviousRow(super.peek());
+ }
+ return seekToLastRow;
+ }
+
+ private boolean isSatisfiedMidKeyCondition(Cell kv) {
+ if (CellUtil.isDelete(kv) && kv.getValueLength() == 0) {
+ // In case of a Delete type KV, let it be going to both the daughter regions.
+ // No problems in doing so. In the correct daughter region where it belongs to, this delete
+ // tomb will really delete a KV. In the other it will just hang around there with no actual
+ // kv coming for which this is a delete tomb. :)
+ return true;
+ }
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(),
+ kv.getRowLength() - reader.getOffset());
+ Entry<ImmutableBytesWritable, IndexMaintainer> entry = reader.getIndexMaintainers().entrySet().iterator().next();
+ IndexMaintainer indexMaintainer = entry.getValue();
+ byte[] viewIndexId = indexMaintainer.getViewIndexIdFromIndexRowKey(rowKey);
+ IndexMaintainer actualIndexMaintainer = reader.getIndexMaintainers().get(new ImmutableBytesWritable(viewIndexId));
+ byte[] dataRowKey = actualIndexMaintainer.buildDataRowKey(rowKey, reader.getViewConstants());
+ int compareResult = Bytes.compareTo(dataRowKey, reader.getSplitRow());
+ if (reader.isTop()) {
+ if (compareResult >= 0) {
+ return true;
+ }
+ } else {
+ if (compareResult < 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * In case of top half store, the passed key will be with the start key of the daughter region.
+ * But in the actual HFiles, the key will be with the start key of the old parent region. In
+ * order to make the real seek in the HFiles, we need to build the old key.
+ *
+ * The logic here is just replace daughter region start key with parent region start key
+ * in the key part.
+ *
+ * @param key
+ *
+ */
+ private KeyValue getKeyPresentInHFiles(byte[] key) {
+ KeyValue keyValue = new KeyValue(key);
+ int rowLength = keyValue.getRowLength();
+ int rowOffset = keyValue.getRowOffset();
+
+ short length = (short) (rowLength - reader.getSplitRow().length + reader.getOffset());
+ byte[] replacedKey =
+ new byte[length + key.length - (rowOffset + rowLength) + ROW_LENGTH_SIZE];
+ System.arraycopy(Bytes.toBytes(length), 0, replacedKey, 0, ROW_LENGTH_SIZE);
+ System.arraycopy(reader.getRegionStartKeyInHFile(), 0, replacedKey, ROW_LENGTH_SIZE, reader.getOffset());
+ System.arraycopy(keyValue.getRowArray(), keyValue.getRowOffset() + reader.getSplitRow().length,
+ replacedKey, reader.getOffset() + ROW_LENGTH_SIZE, rowLength
+ - reader.getSplitRow().length);
+ System.arraycopy(key, rowOffset + rowLength, replacedKey,
+ reader.getOffset() + keyValue.getRowLength() - reader.getSplitRow().length
+ + ROW_LENGTH_SIZE, key.length - (rowOffset + rowLength));
+ return new KeyValue.KeyOnlyKeyValue(replacedKey);
+ }
+
+ /**
+ *
+ * @param kv
+ * @param isSeek pass true for seek, false for reseek.
+ * @return
+ * @throws IOException
+ */
+ public boolean seekOrReseek(Cell cell, boolean isSeek) throws IOException{
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ KeyValue keyToSeek = kv;
+ if (reader.isTop()) {
+ if(getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) < 0){
+ if(!isSeek && realSeekDone()) {
+ return true;
+ }
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ }
+ keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ } else {
+ if (getComparator().compare(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(), reader.getSplitkey(), 0, reader.getSplitkey().length) >= 0) {
+ close();
+ return false;
+ }
+ if(!isSeek && reader.getRegionInfo().getStartKey().length == 0 && reader.getSplitRow().length > reader.getRegionStartKeyInHFile().length) {
+ keyToSeek = getKeyPresentInHFiles(kv.getBuffer());
+ }
+ }
+ return seekOrReseekToProperKey(isSeek, keyToSeek);
+ }
+
+ private boolean seekOrReseekToProperKey(boolean isSeek, KeyValue kv)
+ throws IOException {
+ boolean seekOrReseek = isSeek ? super.seek(kv) : super.reseek(kv);
+ while (seekOrReseek && super.peek() != null
+ && !isSatisfiedMidKeyCondition(super.peek())) {
+ super.next();
+ seekOrReseek = super.peek() != null;
+ }
+ return seekOrReseek;
+ }
+
+ private byte[] getNewRowkeyByRegionStartKeyReplacedWithSplitKey(Cell kv, boolean changeBottomKeys) {
+ int lenOfRemainingKey = kv.getRowLength() - reader.getOffset();
+ byte[] keyReplacedStartKey = new byte[lenOfRemainingKey + reader.getSplitRow().length];
+ System.arraycopy(changeBottomKeys ? new byte[reader.getSplitRow().length] : reader.getSplitRow(), 0,
+ keyReplacedStartKey, 0, reader.getSplitRow().length);
+ System.arraycopy(kv.getRowArray(), kv.getRowOffset() + reader.getOffset(), keyReplacedStartKey,
+ reader.getSplitRow().length, lenOfRemainingKey);
+ return keyReplacedStartKey;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 94736ed..b52e704 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -117,6 +117,10 @@ public class ListJarsQueryPlan implements QueryPlan {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan s) throws SQLException {
+ return iterator(scanGrouper);
+ }
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
return new ResultIterator() {
private RemoteIterator<LocatedFileStatus> listFiles = null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index ed421a7..8e63fa9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -53,7 +53,7 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException {
+ public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
MutationState state = mutate(parentContext, iterator, clonedConnection);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 4dcc134..7722483 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -49,6 +49,8 @@ public interface QueryPlan extends StatementPlan {
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException;
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
+
public long getEstimatedSize();
// TODO: change once joins are supported
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 719970a..5edaead 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -18,7 +18,6 @@
package org.apache.phoenix.compile;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
import java.io.IOException;
import java.util.ArrayList;
@@ -204,7 +203,7 @@ public class ScanRanges {
scan.setStopRow(scanRange.getUpperRange());
}
- private static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
+ public static byte[] prefixKey(byte[] key, int keyOffset, byte[] prefixKey, int prefixKeyOffset) {
if (key.length > 0) {
byte[] newKey = new byte[key.length + prefixKeyOffset];
int totalKeyOffset = keyOffset + prefixKeyOffset;
@@ -213,7 +212,7 @@ public class ScanRanges {
}
System.arraycopy(key, keyOffset, newKey, totalKeyOffset, key.length - keyOffset);
return newKey;
- }
+ }
return key;
}
@@ -229,7 +228,7 @@ public class ScanRanges {
return temp;
}
- private static byte[] stripPrefix(byte[] key, int keyOffset) {
+ public static byte[] stripPrefix(byte[] key, int keyOffset) {
if (key.length == 0) {
return key;
}
@@ -388,10 +387,6 @@ public class ScanRanges {
newScan.setAttribute(SCAN_ACTUAL_START_ROW, scanStartKey);
newScan.setStartRow(scanStartKey);
newScan.setStopRow(scanStopKey);
- if(keyOffset > 0) {
- newScan.setAttribute(STARTKEY_OFFSET, Bytes.toBytes(keyOffset));
- }
-
return newScan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d700c1f0/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 54b4eb7..5e0977b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -115,6 +115,11 @@ public class TraceQueryPlan implements QueryPlan {
}
@Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return iterator(scanGrouper);
+ }
+
+ @Override
public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
final PhoenixConnection conn = stmt.getConnection();
if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) {