You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2024/01/22 06:06:01 UTC

(phoenix) branch PHOENIX-7001-feature updated: Add an extra delete mutation for CDC

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
     new da6ddad04e Add an extra delete mutation for CDC
da6ddad04e is described below

commit da6ddad04e253a27ddc898b3a189f8f6934ad93e
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Mon Jan 22 11:35:24 2024 +0530

    Add an extra delete mutation for CDC
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 25 ++++++++++++++++------
 .../org/apache/phoenix/index/IndexMaintainer.java  | 12 +++++++++++
 .../src/main/protobuf/ServerCachingService.proto   |  1 +
 3 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 593fdbe0d0..702f4da82a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -899,6 +899,13 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
             }
         }
     }
+    private Mutation getDeleteIndexMutation(Put dataRowState, IndexMaintainer indexMaintainer,
+            long ts, ImmutableBytesPtr rowKeyPtr) {
+        ValueGetter cdcDataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
+        byte[] indexRowKey = indexMaintainer.buildRowKey(cdcDataRowVG, rowKeyPtr, null, null, ts);
+        return indexMaintainer.buildRowDeleteMutation(indexRowKey,
+                IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+    }
 
     /**
      * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
@@ -960,13 +967,19 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
                     }
                 } else if (currentDataRowState != null
                         && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
-                    ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState);
-                    byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
-                            null, null, ts);
-                    Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
-                            IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                     context.indexUpdates.put(hTableInterfaceReference,
-                            new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+                            new Pair<Mutation, byte[]>(getDeleteIndexMutation(currentDataRowState,
+                                    indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));
+                    if (indexMaintainer.isCDCIndex()) {
+                        Put cdcDataRowState = new Put(currentDataRowState.getRow());
+                        cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
+                                indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
+                                ByteUtil.EMPTY_BYTE_ARRAY);
+                        context.indexUpdates.put(hTableInterfaceReference,
+                                new Pair<Mutation, byte[]>(getDeleteIndexMutation(cdcDataRowState,
+                                        indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));
+
+                    }
                 }
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c943cb442e..002857b213 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -116,6 +116,7 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ExpressionUtil;
@@ -437,6 +438,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private boolean isUncovered;
     private Expression indexWhere;
     private Set<ColumnReference> indexWhereColumns;
+    private boolean isCDCIndex;
 
     protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
@@ -674,6 +676,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             this.indexWhere = index.getIndexWhereExpression(connection);
             this.indexWhereColumns = index.getIndexWhereColumns(connection);
         }
+        this.isCDCIndex = CDCUtil.isACDCIndex(index);
 
         initCachedState();
     }
@@ -1773,6 +1776,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             maintainer.indexWhere = null;
             maintainer.indexWhereColumns = null;
         }
+        if (proto.hasIsCDCIndex()) {
+            maintainer.isCDCIndex = proto.getIsCDCIndex();
+        } else {
+            maintainer.isCDCIndex = false;
+        }
         maintainer.initCachedState();
         return maintainer;
     }
@@ -1916,6 +1924,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 }
             }
         }
+        builder.setIsCDCIndex(maintainer.isCDCIndex);
         return builder.build();
     }
 
@@ -2239,6 +2248,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     public boolean isUncovered() {
         return isUncovered;
     }
+    public boolean isCDCIndex() {
+        return isCDCIndex;
+    }
     
     public boolean isImmutableRows() {
         return immutableRows;
diff --git a/phoenix-core/src/main/protobuf/ServerCachingService.proto b/phoenix-core/src/main/protobuf/ServerCachingService.proto
index 24717fdb14..c28695ff7d 100644
--- a/phoenix-core/src/main/protobuf/ServerCachingService.proto
+++ b/phoenix-core/src/main/protobuf/ServerCachingService.proto
@@ -71,6 +71,7 @@ message IndexMaintainer {
   optional bool isUncovered = 28;
   optional bytes indexWhere = 29;
   repeated ColumnReference indexWhereColumns = 30;
+  optional bool isCDCIndex = 31;
 }
 
 message TransformMaintainer {