You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/02/12 19:54:30 UTC
phoenix git commit: PHOENIX-2657 Transactionally deleted cells become
visible after few hours
Repository: phoenix
Updated Branches:
refs/heads/master 0c21539cc -> 980eb36e2
PHOENIX-2657 Transactionally deleted cells become visible after few hours
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/980eb36e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/980eb36e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/980eb36e
Branch: refs/heads/master
Commit: 980eb36e251e917343d1376b2ac6f8c57d223c35
Parents: 0c21539
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Feb 12 09:38:43 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Feb 12 09:38:43 2016 -0800
----------------------------------------------------------------------
.../apache/phoenix/filter/SkipScanFilter.java | 47 +++++++++++++-------
.../phoenix/index/PhoenixIndexBuilder.java | 4 +-
.../index/PhoenixTransactionalIndexer.java | 5 ++-
3 files changed, 38 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/980eb36e/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 00320ce..c966d91 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -43,8 +43,6 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ScanUtil.BytesComparator;
import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
@@ -63,8 +61,6 @@ import com.google.common.hash.Hashing;
* @since 0.1
*/
public class SkipScanFilter extends FilterBase implements Writable {
- private static final Logger logger = LoggerFactory.getLogger(SkipScanFilter.class);
-
private enum Terminate {AT, AFTER};
// Conjunctive normal form of or-ed ranges or point lookups
private List<List<KeyRange>> slots;
@@ -72,6 +68,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
private int[] slotSpan;
// schema of the row key
private RowKeySchema schema;
+ private boolean includeMultipleVersions;
// current position for each slot
private int[] position;
// buffer used for skip hint
@@ -94,19 +91,27 @@ public class SkipScanFilter extends FilterBase implements Writable {
public SkipScanFilter() {
}
+ public SkipScanFilter(SkipScanFilter filter, boolean includeMultipleVersions) {
+ this(filter.slots, filter.slotSpan, filter.schema, includeMultipleVersions);
+ }
+
public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) {
this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema);
}
public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema) {
- init(slots, slotSpan, schema);
+ this(slots, slotSpan, schema, false);
+ }
+
+ private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) {
+ init(slots, slotSpan, schema, includeMultipleVersions);
}
public void setOffset(int offset) {
this.offset = offset;
}
- private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema) {
+ private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema schema, boolean includeMultipleVersions) {
for (List<KeyRange> ranges : slots) {
if (ranges.isEmpty()) {
throw new IllegalStateException();
@@ -117,9 +122,10 @@ public class SkipScanFilter extends FilterBase implements Writable {
this.schema = schema;
this.maxKeyLength = SchemaUtil.getMaxKeyLength(schema, slots);
this.position = new int[slots.size()];
- startKey = new byte[maxKeyLength];
- endKey = new byte[maxKeyLength];
- endKeyLength = 0;
+ this.startKey = new byte[maxKeyLength];
+ this.endKey = new byte[maxKeyLength];
+ this.endKeyLength = 0;
+ this.includeMultipleVersions = includeMultipleVersions;
}
// Exposed for testing.
@@ -345,15 +351,20 @@ public class SkipScanFilter extends FilterBase implements Writable {
return i;
}
+ private ReturnCode getIncludeReturnCode() {
+ return includeMultipleVersions ? ReturnCode.INCLUDE : ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="QBA_QUESTIONABLE_BOOLEAN_ASSIGNMENT",
justification="Assignment designed to work this way.")
private ReturnCode navigate(final byte[] currentKey, final int offset, final int length, Terminate terminate) {
int nSlots = slots.size();
+
// First check to see if we're in-range until we reach our end key
if (endKeyLength > 0) {
if (Bytes.compareTo(currentKey, offset, length, endKey, 0, endKeyLength) < 0) {
- return ReturnCode.INCLUDE_AND_NEXT_COL;
+ return getIncludeReturnCode();
}
// If key range of last slot is a single key, we can increment our position
@@ -485,7 +496,7 @@ public class SkipScanFilter extends FilterBase implements Writable {
// up to the upper range of our last slot. We do this for ranges and single keys
// since we potentially have multiple key values for the same row key.
setEndKey(ptr, minOffset, i);
- return ReturnCode.INCLUDE_AND_NEXT_COL;
+ return getIncludeReturnCode();
}
private boolean allTrailingNulls(int i) {
@@ -559,9 +570,14 @@ public class SkipScanFilter extends FilterBase implements Writable {
RowKeySchema schema = new RowKeySchema();
schema.readFields(in);
int andLen = in.readInt();
+ boolean includeMultipleVersions = false;
+ if (andLen < 0) {
+ andLen = -andLen;
+ includeMultipleVersions = true;
+ }
int[] slotSpan = new int[andLen];
List<List<KeyRange>> slots = Lists.newArrayListWithExpectedSize(andLen);
- for (int i=0; i<andLen; i++) {
+ for (int i = 0; i < andLen; i++) {
int orLenWithSlotSpan = in.readInt();
int orLen = orLenWithSlotSpan;
/*
@@ -582,15 +598,16 @@ public class SkipScanFilter extends FilterBase implements Writable {
orClause.add(range);
}
}
- this.init(slots, slotSpan, schema);
+ this.init(slots, slotSpan, schema, includeMultipleVersions);
}
@Override
public void write(DataOutput out) throws IOException {
assert(slots.size() == slotSpan.length);
schema.write(out);
- out.writeInt(slots.size());
- for (int i = 0; i < slots.size(); i++) {
+ int nSlots = slots.size();
+ out.writeInt(this.includeMultipleVersions ? -nSlots : nSlots);
+ for (int i = 0; i < nSlots; i++) {
List<KeyRange> orLen = slots.get(i);
int span = slotSpan[i];
int orLenWithSlotSpan = -( ( (span << KEY_RANGE_LENGTH_BITS) | orLen.size() ) + 1);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/980eb36e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 0601e0a..27d2122 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -90,10 +91,9 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
}
if (maintainers.isEmpty()) return;
Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
- scan.setRaw(true);
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
scanRanges.initializeScan(scan);
- scan.setFilter(scanRanges.getSkipScanFilter());
+ scan.setFilter(new SkipScanFilter(scanRanges.getSkipScanFilter(),true));
Region region = env.getRegion();
RegionScanner scanner = region.getScanner(scan);
// Run through the scanner using internal nextRaw method
http://git-wip-us.apache.org/repos/asf/phoenix/blob/980eb36e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index f7684a6..26f9725 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -53,6 +53,7 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
@@ -226,7 +227,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
scanRanges.initializeScan(scan);
- scan.setFilter(scanRanges.getSkipScanFilter());
TableName tableName = env.getRegion().getRegionInfo().getTable();
HTableInterface htable = env.getTable(tableName);
txTable = new TransactionAwareHTable(htable);
@@ -234,9 +234,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// For rollback, we need to see all versions, including
// the last committed version as there may be multiple
// checkpointed versions.
+ SkipScanFilter filter = scanRanges.getSkipScanFilter();
if (isRollback) {
+ filter = new SkipScanFilter(filter,true);
tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
}
+ scan.setFilter(filter);
currentScanner = txTable.getScanner(scan);
}
if (isRollback) {