You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2015/02/22 05:47:48 UTC
[3/3] phoenix git commit: PHOENIX-1672 RegionScanner.nextRaw contract
not implemented correctly
PHOENIX-1672 RegionScanner.nextRaw contract not implemented correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8d014cba
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8d014cba
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8d014cba
Branch: refs/heads/4.0
Commit: 8d014cbafe2b397c3cca5084abf80be759504e77
Parents: 221ff6b
Author: Andrew Purtell <ap...@apache.org>
Authored: Sat Feb 21 20:34:08 2015 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Feb 21 20:34:21 2015 -0800
----------------------------------------------------------------------
.../GroupedAggregateRegionObserver.java | 96 ++++----
.../UngroupedAggregateRegionObserver.java | 246 ++++++++++---------
.../phoenix/index/PhoenixIndexBuilder.java | 18 +-
.../iterate/RegionScannerResultIterator.java | 36 +--
4 files changed, 216 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 8b59b85..0984b06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -375,7 +375,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
- final RegionScanner s, final List<Expression> expressions,
+ final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
@@ -410,28 +410,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
- do {
- List<Cell> results = new ArrayList<Cell>();
- // Results are potentially returned even when the return
- // value of s.next is false
- // since this is an indication of whether or not there are
- // more values after the
- // ones returned
- hasMore = s.nextRaw(results);
- if (!results.isEmpty()) {
- result.setKeyValues(results);
- ImmutableBytesWritable key =
+ synchronized (scanner) {
+ do {
+ List<Cell> results = new ArrayList<Cell>();
+ // Results are potentially returned even when the return
+ // value of s.next is false
+ // since this is an indication of whether or not there are
+ // more values after the
+ // ones returned
+ hasMore = scanner.nextRaw(results);
+ if (!results.isEmpty()) {
+ result.setKeyValues(results);
+ ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
- Aggregator[] rowAggregators = groupByCache.cache(key);
- // Aggregate values here
- aggregators.aggregate(rowAggregators, result);
- }
- } while (hasMore && groupByCache.size() < limit);
+ Aggregator[] rowAggregators = groupByCache.cache(key);
+ // Aggregate values here
+ aggregators.aggregate(rowAggregators, result);
+ }
+ } while (hasMore && groupByCache.size() < limit);
+ }
} finally {
region.closeRegionOperation();
}
- RegionScanner regionScanner = groupByCache.getScanner(s);
+ RegionScanner regionScanner = groupByCache.getScanner(scanner);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
@@ -453,7 +455,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
* @throws IOException
*/
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Scan scan, final RegionScanner s, final List<Expression> expressions,
+ final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, final long limit) throws IOException {
if (logger.isDebugEnabled()) {
@@ -466,12 +468,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@Override
public HRegionInfo getRegionInfo() {
- return s.getRegionInfo();
+ return scanner.getRegionInfo();
}
@Override
public void close() throws IOException {
- s.close();
+ scanner.close();
}
@Override
@@ -488,32 +490,36 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
- do {
- List<Cell> kvs = new ArrayList<Cell>();
- // Results are potentially returned even when the return
- // value of s.next is false
- // since this is an indication of whether or not there
- // are more values after the
- // ones returned
- hasMore = s.nextRaw(kvs);
- if (!kvs.isEmpty()) {
- result.setKeyValues(kvs);
- key = TupleUtil.getConcatenatedValue(result, expressions);
- aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
- if (!aggBoundary) {
- aggregators.aggregate(rowAggregators, result);
- if (logger.isDebugEnabled()) {
- logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs
+ synchronized (scanner) {
+ do {
+ List<Cell> kvs = new ArrayList<Cell>();
+ // Results are potentially returned even when the return
+ // value of s.next is false
+ // since this is an indication of whether or not there
+ // are more values after the
+ // ones returned
+ hasMore = scanner.nextRaw(kvs);
+ if (!kvs.isEmpty()) {
+ result.setKeyValues(kvs);
+ key = TupleUtil.getConcatenatedValue(result, expressions);
+ aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
+ if (!aggBoundary) {
+ aggregators.aggregate(rowAggregators, result);
+ if (logger.isDebugEnabled()) {
+ logger.debug(LogUtil.addCustomAnnotations(
+ "Row passed filters: " + kvs
+ ", aggregated values: "
- + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan)));
+ + Arrays.asList(rowAggregators),
+ ScanUtil.getCustomAnnotations(scan)));
+ }
+ currentKey = key;
}
- currentKey = key;
}
- }
- atLimit = rowCount + countOffset >= limit;
- // Do rowCount + 1 b/c we don't have to wait for a complete
- // row in the case of a DISTINCT with a LIMIT
- } while (hasMore && !aggBoundary && !atLimit);
+ atLimit = rowCount + countOffset >= limit;
+ // Do rowCount + 1 b/c we don't have to wait for a complete
+ // row in the case of a DISTINCT with a LIMIT
+ } while (hasMore && !aggBoundary && !atLimit);
+ }
} finally {
region.closeRegionOperation();
}
@@ -555,7 +561,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
@Override
public long getMaxResultSize() {
- return s.getMaxResultSize();
+ return scanner.getMaxResultSize();
}
};
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a3b2faa..71c4dc6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -253,128 +253,152 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{
final RegionScanner innerScanner = theScanner;
region.startRegionOperation();
try {
- do {
- List<Cell> results = new ArrayList<Cell>();
- // Results are potentially returned even when the return value of s.next is false
- // since this is an indication of whether or not there are more values after the
- // ones returned
- hasMore = innerScanner.nextRaw(results);
- if(stats != null) {
- stats.collectStatistics(results);
- }
-
- if (!results.isEmpty()) {
- rowCount++;
- result.setKeyValues(results);
- try {
- if (buildLocalIndex) {
- for (IndexMaintainer maintainer : indexMaintainers) {
- if (!results.isEmpty()) {
- result.getKey(ptr);
- ValueGetter valueGetter = maintainer.createGetterFromKeyValues(ImmutableBytesPtr.copyBytesIfNecessary(ptr),results);
- Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey(), c.getEnvironment().getRegion().getEndKey());
- indexMutations.add(put);
+ synchronized (innerScanner) {
+ do {
+ List<Cell> results = new ArrayList<Cell>();
+ // Results are potentially returned even when the return value of s.next is false
+ // since this is an indication of whether or not there are more values after the
+ // ones returned
+ hasMore = innerScanner.nextRaw(results);
+ if (stats != null) {
+ stats.collectStatistics(results);
+ }
+ if (!results.isEmpty()) {
+ rowCount++;
+ result.setKeyValues(results);
+ try {
+ if (buildLocalIndex) {
+ for (IndexMaintainer maintainer : indexMaintainers) {
+ if (!results.isEmpty()) {
+ result.getKey(ptr);
+ ValueGetter valueGetter =
+ maintainer.createGetterFromKeyValues(
+ ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+ results);
+ Put put = maintainer.buildUpdateMutation(kvBuilder,
+ valueGetter, ptr, ts,
+ c.getEnvironment().getRegion().getStartKey(),
+ c.getEnvironment().getRegion().getEndKey());
+ indexMutations.add(put);
+ }
}
- }
- result.setKeyValues(results);
- } else if (isDelete) {
- // FIXME: the version of the Delete constructor without the lock args was introduced
- // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
- // of the client.
- Cell firstKV = results.get(0);
- Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(),
- firstKV.getRowLength(),ts);
- mutations.add(delete);
- } else if (isUpsert) {
- Arrays.fill(values, null);
- int i = 0;
- List<PColumn> projectedColumns = projectedTable.getColumns();
- for (; i < projectedTable.getPKColumns().size(); i++) {
- Expression expression = selectExpressions.get(i);
- if (expression.evaluate(result, ptr)) {
- values[i] = ptr.copyBytes();
- // If SortOrder from expression in SELECT doesn't match the
- // column being projected into then invert the bits.
- if (expression.getSortOrder() != projectedColumns.get(i).getSortOrder()) {
- SortOrder.invert(values[i], 0, values[i], 0, values[i].length);
+ result.setKeyValues(results);
+ } else if (isDelete) {
+ // FIXME: the version of the Delete constructor without the lock
+ // args was introduced in 0.94.4, thus if we try to use it here
+ // we can no longer use the 0.94.2 version of the client.
+ Cell firstKV = results.get(0);
+ Delete delete = new Delete(firstKV.getRowArray(),
+ firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+ mutations.add(delete);
+ } else if (isUpsert) {
+ Arrays.fill(values, null);
+ int i = 0;
+ List<PColumn> projectedColumns = projectedTable.getColumns();
+ for (; i < projectedTable.getPKColumns().size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ values[i] = ptr.copyBytes();
+ // If SortOrder from expression in SELECT doesn't match the
+ // column being projected into then invert the bits.
+ if (expression.getSortOrder() !=
+ projectedColumns.get(i).getSortOrder()) {
+ SortOrder.invert(values[i], 0, values[i], 0,
+ values[i].length);
+ }
}
}
- }
- projectedTable.newKey(ptr, values);
- PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
- for (; i < projectedColumns.size(); i++) {
- Expression expression = selectExpressions.get(i);
- if (expression.evaluate(result, ptr)) {
- PColumn column = projectedColumns.get(i);
- Object value = expression.getDataType().toObject(ptr, column.getSortOrder());
- // We are guaranteed that the two column will have the same type.
- if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
- expression.getMaxLength(), expression.getScale(),
- column.getMaxLength(), column.getScale())) {
- throw new ValueTypeIncompatibleException(column.getDataType(),
- column.getMaxLength(), column.getScale());
+ projectedTable.newKey(ptr, values);
+ PRow row = projectedTable.newRow(kvBuilder, ts, ptr);
+ for (; i < projectedColumns.size(); i++) {
+ Expression expression = selectExpressions.get(i);
+ if (expression.evaluate(result, ptr)) {
+ PColumn column = projectedColumns.get(i);
+ Object value = expression.getDataType()
+ .toObject(ptr, column.getSortOrder());
+ // We are guaranteed that the two column will have the
+ // same type.
+ if (!column.getDataType().isSizeCompatible(ptr, value,
+ column.getDataType(), expression.getMaxLength(),
+ expression.getScale(), column.getMaxLength(),
+ column.getScale())) {
+ throw new ValueTypeIncompatibleException(
+ column.getDataType(), column.getMaxLength(),
+ column.getScale());
+ }
+ column.getDataType().coerceBytes(ptr, value,
+ expression.getDataType(), expression.getMaxLength(),
+ expression.getScale(), expression.getSortOrder(),
+ column.getMaxLength(), column.getScale(),
+ column.getSortOrder());
+ byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ row.setValue(column, bytes);
}
- column.getDataType().coerceBytes(ptr, value, expression.getDataType(),
- expression.getMaxLength(), expression.getScale(), expression.getSortOrder(),
- column.getMaxLength(), column.getScale(), column.getSortOrder());
- byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
- row.setValue(column, bytes);
+ }
+ for (Mutation mutation : row.toRowMutations()) {
+ mutations.add(mutation);
+ }
+ for (i = 0; i < selectExpressions.size(); i++) {
+ selectExpressions.get(i).reset();
+ }
+ } else if (deleteCF != null && deleteCQ != null) {
+ // No need to search for delete column, since we project only it
+ // if no empty key value is being set
+ if (emptyCF == null ||
+ result.getValue(deleteCF, deleteCQ) != null) {
+ Delete delete = new Delete(results.get(0).getRowArray(),
+ results.get(0).getRowOffset(),
+ results.get(0).getRowLength());
+ delete.deleteColumns(deleteCF, deleteCQ, ts);
+ mutations.add(delete);
}
}
- for (Mutation mutation : row.toRowMutations()) {
- mutations.add(mutation);
- }
- for (i = 0; i < selectExpressions.size(); i++) {
- selectExpressions.get(i).reset();
+ if (emptyCF != null) {
+ /*
+ * If we've specified an emptyCF, then we need to insert an empty
+ * key value "retroactively" for any key value that is visible at
+ * the timestamp that the DDL was issued. Key values that are not
+ * visible at this timestamp will not ever be projected up to
+ * scans past this timestamp, so don't need to be considered.
+ * We insert one empty key value per row per timestamp.
+ */
+ Set<Long> timeStamps =
+ Sets.newHashSetWithExpectedSize(results.size());
+ for (Cell kv : results) {
+ long kvts = kv.getTimestamp();
+ if (!timeStamps.contains(kvts)) {
+ Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+ kv.getRowLength());
+ put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ mutations.add(put);
+ }
+ }
}
- } else if (deleteCF != null && deleteCQ != null) {
- // No need to search for delete column, since we project only it
- // if no empty key value is being set
- if (emptyCF == null || result.getValue(deleteCF, deleteCQ) != null) {
- Delete delete = new Delete(results.get(0).getRowArray(), results.get(0).getRowOffset(),
- results.get(0).getRowLength());
- delete.deleteColumns(deleteCF, deleteCQ, ts);
- mutations.add(delete);
+ // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+ if (!mutations.isEmpty() && batchSize > 0 &&
+ mutations.size() % batchSize == 0) {
+ commitBatch(region, mutations, indexUUID);
+ mutations.clear();
}
- }
- if (emptyCF != null) {
- /*
- * If we've specified an emptyCF, then we need to insert an empty
- * key value "retroactively" for any key value that is visible at
- * the timestamp that the DDL was issued. Key values that are not
- * visible at this timestamp will not ever be projected up to
- * scans past this timestamp, so don't need to be considered.
- * We insert one empty key value per row per timestamp.
- */
- Set<Long> timeStamps = Sets.newHashSetWithExpectedSize(results.size());
- for (Cell kv : results) {
- long kvts = kv.getTimestamp();
- if (!timeStamps.contains(kvts)) {
- Put put = new Put(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
- put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
- mutations.add(put);
- }
+ // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+ if (!indexMutations.isEmpty() && batchSize > 0 &&
+ indexMutations.size() % batchSize == 0) {
+ commitIndexMutations(c, region, indexMutations);
}
+ } catch (ConstraintViolationException e) {
+ // Log and ignore in count
+ logger.error(LogUtil.addCustomAnnotations("Failed to create row in " +
+ region.getRegionNameAsString() + " with values " +
+ SchemaUtil.toString(values),
+ ScanUtil.getCustomAnnotations(scan)), e);
+ continue;
}
- // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
- if (!mutations.isEmpty() && batchSize > 0 && mutations.size() % batchSize == 0) {
- commitBatch(region, mutations, indexUUID);
- mutations.clear();
- }
- // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
- if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) {
- commitIndexMutations(c, region, indexMutations);
- }
-
- } catch (ConstraintViolationException e) {
- // Log and ignore in count
- logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), ScanUtil.getCustomAnnotations(scan)), e);
- continue;
+ aggregators.aggregate(rowAggregators, result);
+ hasAny = true;
}
- aggregators.aggregate(rowAggregators, result);
- hasAny = true;
- }
- } while (hasMore);
+ } while (hasMore);
+ }
} finally {
try {
if (stats != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index a0bd7c5..b89c807 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -78,14 +78,16 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
// Run through the scanner using internal nextRaw method
region.startRegionOperation();
try {
- boolean hasMore;
- do {
- List<Cell> results = Lists.newArrayList();
- // Results are potentially returned even when the return value of s.next is false
- // since this is an indication of whether or not there are more values after the
- // ones returned
- hasMore = scanner.nextRaw(results);
- } while (hasMore);
+ synchronized (scanner) {
+ boolean hasMore;
+ do {
+ List<Cell> results = Lists.newArrayList();
+ // Results are potentially returned even when the return value of s.next is
+ // false since this is an indication of whether or not there are more values
+ // after the ones returned
+ hasMore = scanner.nextRaw(results);
+ } while (hasMore);
+ }
} finally {
try {
scanner.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8d014cba/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index bff0936..88e141a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -38,23 +38,27 @@ public class RegionScannerResultIterator extends BaseResultIterator {
@Override
public Tuple next() throws SQLException {
- try {
- // TODO: size
- List<Cell> results = new ArrayList<Cell>();
- // Results are potentially returned even when the return value of s.next is false
- // since this is an indication of whether or not there are more values after the
- // ones returned
- boolean hasMore = scanner.nextRaw(results);
- if (!hasMore && results.isEmpty()) {
- return null;
+ // XXX: No access here to the region instance to enclose this with startRegionOperation /
+ // stopRegionOperation
+ synchronized (scanner) {
+ try {
+ // TODO: size
+ List<Cell> results = new ArrayList<Cell>();
+ // Results are potentially returned even when the return value of s.next is false
+ // since this is an indication of whether or not there are more values after the
+ // ones returned
+ boolean hasMore = scanner.nextRaw(results);
+ if (!hasMore && results.isEmpty()) {
+ return null;
+ }
+ // We instantiate a new tuple because in all cases currently we hang on to it
+ // (i.e. to compute and hold onto the TopN).
+ MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+ tuple.setKeyValues(results);
+ return tuple;
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
}
- // We instantiate a new tuple because in all cases currently we hang on to it (i.e.
- // to compute and hold onto the TopN).
- MultiKeyValueTuple tuple = new MultiKeyValueTuple();
- tuple.setKeyValues(results);
- return tuple;
- } catch (IOException e) {
- throw ServerUtil.parseServerException(e);
}
}