You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2024/01/22 06:06:01 UTC
(phoenix) branch PHOENIX-7001-feature updated: Add an extra delete mutation for CDC
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this push:
new da6ddad04e Add an extra delete mutation for CDC
da6ddad04e is described below
commit da6ddad04e253a27ddc898b3a189f8f6934ad93e
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Mon Jan 22 11:35:24 2024 +0530
Add an extra delete mutation for CDC
---
.../phoenix/hbase/index/IndexRegionObserver.java | 25 ++++++++++++++++------
.../org/apache/phoenix/index/IndexMaintainer.java | 12 +++++++++++
.../src/main/protobuf/ServerCachingService.proto | 1 +
3 files changed, 32 insertions(+), 6 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 593fdbe0d0..702f4da82a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -899,6 +899,13 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
}
}
}
+ private Mutation getDeleteIndexMutation(Put dataRowState, IndexMaintainer indexMaintainer,
+ long ts, ImmutableBytesPtr rowKeyPtr) {
+ ValueGetter cdcDataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
+ byte[] indexRowKey = indexMaintainer.buildRowKey(cdcDataRowVG, rowKeyPtr, null, null, ts);
+ return indexMaintainer.buildRowDeleteMutation(indexRowKey,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ }
/**
* Generate the index update for a data row from the mutation that are obtained by merging the previous data row
@@ -960,13 +967,19 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
}
} else if (currentDataRowState != null
&& indexMaintainer.shouldPrepareIndexMutations(currentDataRowState)) {
- ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState);
- byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
- null, null, ts);
- Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
- IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
context.indexUpdates.put(hTableInterfaceReference,
- new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+ new Pair<Mutation, byte[]>(getDeleteIndexMutation(currentDataRowState,
+ indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));
+ if (indexMaintainer.isCDCIndex()) {
+ Put cdcDataRowState = new Put(currentDataRowState.getRow());
+ cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
+ indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation, byte[]>(getDeleteIndexMutation(cdcDataRowState,
+ indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get()));
+
+ }
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c943cb442e..002857b213 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -116,6 +116,7 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.util.BitSet;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
@@ -437,6 +438,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
private boolean isUncovered;
private Expression indexWhere;
private Set<ColumnReference> indexWhereColumns;
+ private boolean isCDCIndex;
protected IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
this.dataRowKeySchema = dataRowKeySchema;
@@ -674,6 +676,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
this.indexWhere = index.getIndexWhereExpression(connection);
this.indexWhereColumns = index.getIndexWhereColumns(connection);
}
+ this.isCDCIndex = CDCUtil.isACDCIndex(index);
initCachedState();
}
@@ -1773,6 +1776,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
maintainer.indexWhere = null;
maintainer.indexWhereColumns = null;
}
+ if (proto.hasIsCDCIndex()) {
+ maintainer.isCDCIndex = proto.getIsCDCIndex();
+ } else {
+ maintainer.isCDCIndex = false;
+ }
maintainer.initCachedState();
return maintainer;
}
@@ -1916,6 +1924,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
}
}
}
+ builder.setIsCDCIndex(maintainer.isCDCIndex);
return builder.build();
}
@@ -2239,6 +2248,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
public boolean isUncovered() {
return isUncovered;
}
+ public boolean isCDCIndex() {
+ return isCDCIndex;
+ }
public boolean isImmutableRows() {
return immutableRows;
diff --git a/phoenix-core/src/main/protobuf/ServerCachingService.proto b/phoenix-core/src/main/protobuf/ServerCachingService.proto
index 24717fdb14..c28695ff7d 100644
--- a/phoenix-core/src/main/protobuf/ServerCachingService.proto
+++ b/phoenix-core/src/main/protobuf/ServerCachingService.proto
@@ -71,6 +71,7 @@ message IndexMaintainer {
optional bool isUncovered = 28;
optional bytes indexWhere = 29;
repeated ColumnReference indexWhereColumns = 30;
+ optional bool isCDCIndex = 31;
}
message TransformMaintainer {