You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/04/20 22:04:22 UTC

phoenix git commit: Keep table point deletes as index point deletes (WIP)

Repository: phoenix
Updated Branches:
  refs/heads/txn ea523e7c5 -> 4906b8b2b


Keep table point deletes as index point deletes (WIP)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4906b8b2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4906b8b2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4906b8b2

Branch: refs/heads/txn
Commit: 4906b8b2b41b5c67fe6aa345a4311290e4da7c4b
Parents: ea523e7
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Apr 20 13:04:17 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Apr 20 13:04:17 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/index/IndexMaintainer.java   | 36 ++++++++++++++++----
 .../index/PhoenixTransactionalIndexer.java      | 13 +++----
 2 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 3500dd2..19dfaa0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -781,8 +781,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return put;
     }
 
-    public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
+    private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
+    private DeleteType getDeleteTypeOrNull(Collection<Cell> pendingUpdates) {
         int nDeleteCF = 0;
+        int nDeleteVersionCF = 0;
         for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
                 nDeleteCF++;
@@ -790,12 +792,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                   dataEmptyKeyValueCF, 0, dataEmptyKeyValueCF.length) == 0;
                 // This is what a delete looks like on the client side for immutable indexing...
                 if (isEmptyCF) {
-                    return true;
+                    return DeleteType.ALL_VERSIONS;
                 }
+            } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+                nDeleteVersionCF++;
             }
         }
         // This is what a delete looks like on the server side for mutable indexing...
-        return nDeleteCF == this.nDataCFs;
+        // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
+        return nDeleteVersionCF >= this.nDataCFs ? DeleteType.SINGLE_VERSION : nDeleteCF + nDeleteVersionCF >= this.nDataCFs ? DeleteType.ALL_VERSIONS : null;
+    }
+    
+    public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
+        return getDeleteTypeOrNull(pendingUpdates) != null;
     }
     
     private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException {
@@ -841,8 +850,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
-        if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
-            Delete delete = new Delete(indexRowKey, ts);
+        DeleteType deleteType = null;
+        if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
+            Delete delete;
+            // If table delete was single version, then index delete should be as well
+            if (deleteType == DeleteType.SINGLE_VERSION) {
+                delete = new Delete(indexRowKey);
+                for (ColumnReference ref : getAllColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+                    delete.deleteFamilyVersion(ref.getFamily(), ts);
+                }
+            } else {
+                delete = new Delete(indexRowKey, ts);
+            }
             delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
             return delete;
         }
@@ -856,7 +875,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    // If point delete for data table, then use point delete for index as well
+                    if (kv.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                        delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    } else {
+                        delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4906b8b2/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 6a13552..38d6fd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -289,26 +288,28 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         }
 
         public void applyMutation() {
-            if (mutation instanceof Delete) {
+            /*if (mutation instanceof Delete) {
                 valueMap.clear();
-            } else {
+            } else */ {
                 for (Cell cell : pendingUpdates) {
-                    if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                    if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
                         ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                         valueMap.remove(ref);
-                    } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+                    } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
                         for (ColumnReference ref : indexedColumns) {
                             if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
                                 valueMap.remove(ref);
                             }
                         }
-                    } else {
+                    } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
                         ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
                         if (indexedColumns.contains(ref)) {
                             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                             ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                             valueMap.put(ref, ptr);
                         }
+                    } else {
+                        throw new IllegalStateException("Unexpected mutation type for " + cell);
                     }
                 }
             }