You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 22:04:22 UTC
phoenix git commit: Keep table point deletes as index point deletes
(WIP)
Repository: phoenix
Updated Branches:
refs/heads/txn ea523e7c5 -> 4906b8b2b
Keep table point deletes as index point deletes (WIP)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4906b8b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4906b8b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4906b8b2
Branch: refs/heads/txn
Commit: 4906b8b2b41b5c67fe6aa345a4311290e4da7c4b
Parents: ea523e7
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Apr 20 13:04:17 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Apr 20 13:04:17 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/index/IndexMaintainer.java | 36 ++++++++++++++++----
.../index/PhoenixTransactionalIndexer.java | 13 +++----
2 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 3500dd2..19dfaa0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -781,8 +781,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
return put;
}
- public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
+ private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
+ private DeleteType getDeleteTypeOrNull(Collection<Cell> pendingUpdates) {
int nDeleteCF = 0;
+ int nDeleteVersionCF = 0;
for (Cell kv : pendingUpdates) {
if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
nDeleteCF++;
@@ -790,12 +792,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
dataEmptyKeyValueCF, 0, dataEmptyKeyValueCF.length) == 0;
// This is what a delete looks like on the client side for immutable indexing...
if (isEmptyCF) {
- return true;
+ return DeleteType.ALL_VERSIONS;
}
+ } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+ nDeleteVersionCF++;
}
}
// This is what a delete looks like on the server side for mutable indexing...
- return nDeleteCF == this.nDataCFs;
+ // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
+ return nDeleteVersionCF >= this.nDataCFs ? DeleteType.SINGLE_VERSION : nDeleteCF + nDeleteVersionCF >= this.nDataCFs ? DeleteType.ALL_VERSIONS : null;
+ }
+
+ public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
+ return getDeleteTypeOrNull(pendingUpdates) != null;
}
private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException {
@@ -841,8 +850,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
// Delete the entire row if any of the indexed columns changed
- if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
- Delete delete = new Delete(indexRowKey, ts);
+ DeleteType deleteType = null;
+ if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
+ Delete delete;
+ // If table delete was single version, then index delete should be as well
+ if (deleteType == DeleteType.SINGLE_VERSION) {
+ delete = new Delete(indexRowKey);
+ for (ColumnReference ref : getAllColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+ delete.deleteFamilyVersion(ref.getFamily(), ts);
+ }
+ } else {
+ delete = new Delete(indexRowKey, ts);
+ }
delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
return delete;
}
@@ -856,7 +875,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
delete = new Delete(indexRowKey);
delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
}
- delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ // If point delete for data table, then use point delete for index as well
+ if (kv.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ } else {
+ delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 6a13552..38d6fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
@@ -289,26 +288,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
public void applyMutation() {
- if (mutation instanceof Delete) {
+ /*if (mutation instanceof Delete) {
valueMap.clear();
- } else {
+ } else */ {
for (Cell cell : pendingUpdates) {
- if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+ if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
valueMap.remove(ref);
- } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+ } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
for (ColumnReference ref : indexedColumns) {
if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
valueMap.remove(ref);
}
}
- } else {
+ } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
if (indexedColumns.contains(ref)) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
valueMap.put(ref, ptr);
}
+ } else {
+ throw new IllegalStateException("Unexpected mutation type for " + cell);
}
}
}