You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/12/08 21:30:04 UTC
[phoenix] branch 4.x updated: [PHOENIX-6213] Extend Cell Tags to
Delete object to store source of operation.
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new e1ee29d [PHOENIX-6213] Extend Cell Tags to Delete object to store source of operation.
e1ee29d is described below
commit e1ee29d4e68cefd2639eca88ed27cec0bc714eef
Author: Rushabh <ru...@salesforce.com>
AuthorDate: Mon Dec 7 16:08:43 2020 -0800
[PHOENIX-6213] Extend Cell Tags to Delete object to store source of operation.
---
.../java/org/apache/phoenix/end2end/DeleteIT.java | 178 ++++++++++++++++++++-
.../org/apache/hadoop/hbase/PhoenixTagType.java | 29 ++++
.../java/org/apache/phoenix/call/CallRunner.java | 1 -
.../org/apache/phoenix/compile/DeleteCompiler.java | 15 +-
.../UngroupedAggregateRegionScanner.java | 13 +-
.../org/apache/phoenix/execute/MutationState.java | 9 ++
.../phoenix/hbase/index/IndexRegionObserver.java | 44 +++++
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 10 ++
.../org/apache/phoenix/query/QueryServices.java | 7 +
.../apache/phoenix/compile/QueryOptimizerTest.java | 4 +-
10 files changed, 299 insertions(+), 11 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index f6f99d9..0c71b1d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -18,12 +18,13 @@
package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.printResultSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
@@ -32,14 +33,31 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.DeleteCompiler;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -943,6 +961,162 @@ public class DeleteIT extends ParallelStatsDisabledIT {
}
}
}
-}
+ /*
+ Tests whether we have cell tags in delete marker for
+ ClientSelectDeleteMutationPlan.
+ */
+ @Test
+ public void testDeleteClientDeleteMutationPlan() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ String tagValue = "customer-delete";
+ String delete = "DELETE FROM " + tableName + " WHERE v1 = 'foo'";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // Add tag "customer-delete" to delete marker.
+ props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+
+ createAndUpsertTable(tableName, indexName, props);
+ // Make sure that the plan creates is of ClientSelectDeleteMutationPlan
+ verifyDeletePlan(delete, DeleteCompiler.ClientSelectDeleteMutationPlan.class, props);
+ executeDelete(delete, props, 1);
+ String startRowKeyForBaseTable = "1";
+ String startRowKeyForIndexTable = "foo";
+ // Make sure that Delete Marker has cell tag for base table
+ // and has no cell tag for index table.
+ checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+ checkTagPresentInDeleteMarker(indexName, startRowKeyForIndexTable, false, null);
+ }
+
+ /*
+ Tests whether we have cell tags in delete marker for
+ ServerSelectDeleteMutationPlan.
+ */
+ @Test
+ public void testDeleteServerDeleteMutationPlan() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ String tagValue = "customer-delete";
+ String delete = "DELETE FROM " + tableName;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+
+ createAndUpsertTable(tableName, indexName, props);
+ // Make sure that the plan creates is of ServerSelectDeleteMutationPlan
+ verifyDeletePlan(delete, DeleteCompiler.ServerSelectDeleteMutationPlan.class, props);
+ executeDelete(delete, props, 2);
+
+ String startRowKeyForBaseTable = "1";
+ String startRowKeyForIndexTable = "foo";
+ // Make sure that Delete Marker has cell tag for base table
+ // and has no cell tag for index table.
+ checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+ checkTagPresentInDeleteMarker(indexName, startRowKeyForIndexTable, false, null);
+ }
+ /*
+ Tests whether we have cell tags in delete marker for
+ MultiRowDeleteMutationPlan.
+ */
+ @Test
+ public void testDeleteMultiRowDeleteMutationPlan() throws Exception {
+ String tableName = generateUniqueName();
+ String tagValue = "customer-delete";
+ String delete = "DELETE FROM " + tableName + " WHERE k = 1";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+ // Don't create index table. We will use MultiRowDeleteMutationPlan
+ // if there is no index present for a table.
+ createAndUpsertTable(tableName, null, props);
+ // Make sure that the plan creates is of MultiRowDeleteMutationPlan
+ verifyDeletePlan(delete, DeleteCompiler.MultiRowDeleteMutationPlan.class, props);
+ executeDelete(delete, props, 1);
+ String startRowKeyForBaseTable = "1";
+ // Make sure that Delete Marker has cell tag for base table.
+ // We haven't created index table for this test case.
+ checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+ }
+
+ /*
+ Verify whether plan that we create for delete statement is of planClass
+ */
+ private void verifyDeletePlan(String delete, Class<? extends MutationPlan> planClass,
+ Properties props) throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+ SQLParser parser = new SQLParser(delete);
+ DeleteStatement deleteStmt = (DeleteStatement) parser.parseStatement();
+ DeleteCompiler compiler = new DeleteCompiler(stmt, null);
+ MutationPlan plan = compiler.compile(deleteStmt);
+ assertEquals(plan.getClass(), planClass);
+ }
+ }
+ private void createAndUpsertTable(String tableName, String indexName, Properties props)
+ throws SQLException {
+ String ddl = "CREATE TABLE " + tableName +
+ " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)";
+ try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ try (Statement statement = conn.createStatement()) {
+ statement.execute(ddl);
+ if (indexName != null) {
+ String indexDdl1 = "CREATE INDEX " + indexName + " ON " + tableName + "(v1,v2)";
+ statement.execute(indexDdl1);
+ }
+ statement.execute(
+ "upsert into " + tableName + " values (1, 'foo', 'foo1')");
+ statement.execute(
+ "upsert into " + tableName + " values (2, 'bar', 'bar1')");
+ conn.commit();
+ }
+ }
+ }
+
+ private void executeDelete(String delete, Properties props, int deleteRowCount)
+ throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ try (Statement statement = conn.createStatement()) {
+ int rs = statement.executeUpdate(delete);
+ assertEquals( deleteRowCount, rs);
+ }
+ }
+ }
+
+ /*
+ Verify whether we have tags present for base table and not present for
+ index tables.
+ */
+ private void checkTagPresentInDeleteMarker(String tableName, String startRowKey,
+ boolean tagPresent, String tagValue) throws IOException {
+ List<Cell> values = new ArrayList<>();
+ TableName table = TableName.valueOf(tableName);
+ // Scan table with specified startRowKey
+ for (HRegion region : getUtility().getHBaseCluster().getRegions(table)) {
+ Scan scan = new Scan();
+ // Make sure to set rawScan to true so that we will get Delete Markers.
+ scan.setRaw(true);
+ scan.setStartRow(Bytes.toBytes(startRowKey));
+ RegionScanner scanner = region.getScanner(scan);
+ scanner.next(values);
+ if (!values.isEmpty()) {
+ break;
+ }
+ }
+ assertFalse("Values shouldn't be empty", values.isEmpty());
+ Cell first = values.get(0);
+ assertTrue("First cell should be delete marker ", CellUtil.isDelete(first));
+ List<Tag> tags = Tag.asList(first.getTagsArray(),
+ first.getTagsOffset(), first.getTagsLength());
+ if (tagPresent) {
+ assertEquals(1, tags.size());
+ Tag sourceOfOperationTag = Tag.getTag(first.getTagsArray(), first.getTagsOffset(),
+ first.getTagsLength(), PhoenixTagType.SOURCE_OPERATION_TAG_TYPE);
+ Assert.assertNotNull(sourceOfOperationTag);
+ assertEquals(tagValue, Bytes.toString(sourceOfOperationTag.getValue()));
+ } else {
+ assertEquals(0, tags.size());
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java
new file mode 100644
index 0000000..da2064e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+/**
+ Used to persist the TagType in HBase Cell Tags.
+ All the type present here should be more than @{@link Tag#CUSTOM_TAG_TYPE_RANGE} which is 64.
+ **/
+public final class PhoenixTagType {
+ /**
+ * Indicates the source of operation.
+ */
+ public static final byte SOURCE_OPERATION_TAG_TYPE = (byte) 65;
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
index face677..69807c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
@@ -62,5 +62,4 @@ public class CallRunner {
}
}
}
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ebba909..ca573dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -22,7 +22,6 @@ import static org.apache.phoenix.util.NumberUtil.add;
import java.io.IOException;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -351,7 +351,7 @@ public class DeleteCompiler {
return Collections.emptyList();
}
- private class MultiRowDeleteMutationPlan implements MutationPlan {
+ public class MultiRowDeleteMutationPlan implements MutationPlan {
private final List<MutationPlan> plans;
private final MutationPlan firstPlan;
private final QueryPlan dataPlan;
@@ -621,7 +621,7 @@ public class DeleteCompiler {
final int offset = table.getBucketNum() == null ? 0 : 1;
Iterator<PColumn> projectedColsItr = projectedColumns.iterator();
int i = 0;
- while(projectedColsItr.hasNext()) {
+ while (projectedColsItr.hasNext()) {
final int position = i++;
adjustedProjectedColumns.add(new DelegateColumn(projectedColsItr.next()) {
@Override
@@ -742,7 +742,7 @@ public class DeleteCompiler {
}
}
- private class ServerSelectDeleteMutationPlan implements MutationPlan {
+ public class ServerSelectDeleteMutationPlan implements MutationPlan {
private final StatementContext context;
private final QueryPlan dataPlan;
private final PhoenixConnection connection;
@@ -802,6 +802,11 @@ public class DeleteCompiler {
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
+ String sourceOfDelete = statement.getConnection().getSourceOfOperation();
+ if (sourceOfDelete != null) {
+ context.getScan().setAttribute(QueryServices.SOURCE_OPERATION_ATTRIB,
+ Bytes.toBytes(sourceOfDelete));
+ }
}
ResultIterator iterator = aggPlan.iterator();
try {
@@ -860,7 +865,7 @@ public class DeleteCompiler {
}
}
- private class ClientSelectDeleteMutationPlan implements MutationPlan {
+ public class ClientSelectDeleteMutationPlan implements MutationPlan {
private final StatementContext context;
private final TableRef targetTableRef;
private final QueryPlan dataPlan;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 362d08c..7f4e73c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -430,6 +431,12 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
if (replayMutations != null) {
delete.setAttribute(REPLAY_WRITES, replayMutations);
}
+ byte[] sourceOperationBytes =
+ scan.getAttribute(SOURCE_OPERATION_ATTRIB);
+ if (sourceOperationBytes != null) {
+ delete.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOperationBytes);
+ }
+
mutations.add(delete);
// force tephra to ignore this deletes
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -446,6 +453,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
delete.deleteColumns(deleteCF, deleteCQ, ts);
// force tephra to ignore this deletes
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+ // TODO: We need to set SOURCE_OPERATION_ATTRIB here also. The control will come here if
+ // TODO: we drop a column. We also delete metadata from SYSCAT table for the dropped column
+ // TODO: and delete the column. In short, we need to set this attribute for the DM for SYSCAT metadata
+ // TODO: and for data table rows.
mutations.add(delete);
}
}
@@ -469,7 +480,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
SortOrder.invert(values[i], 0, values[i], 0,
values[i].length);
}
- }else{
+ } else {
values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 207c2bf..d1f97d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
@@ -646,6 +647,14 @@ public class MutationState implements SQLCloseable {
if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
row.delete();
rowMutations = row.toRowMutations();
+ String sourceOfDelete = getConnection().getSourceOfOperation();
+ if (sourceOfDelete != null) {
+ byte[] sourceOfDeleteBytes = Bytes.toBytes(sourceOfDelete);
+ // Set the source of operation attribute.
+ for (Mutation mutation: rowMutations) {
+ mutation.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOfDeleteBytes);
+ }
+ }
// The DeleteCompiler already generates the deletes for indexes, so no need to do it again
rowMutationsPertainingToIndex = Collections.emptyList();
} else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 93c9766..2a7747d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagRewriteCell;
+import org.apache.phoenix.query.QueryServices;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -922,6 +927,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
setBatchMutateContext(c, context);
+ // Need to add cell tags to Delete Marker before we do any index processing
+ // since we add tags to tables which doesn't have indexes also.
+ setDeleteAttributes(miniBatchOp);
/*
* Exclusively lock all rows so we get a consistent read
@@ -978,6 +986,42 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
}
+ /**
+ * Set Cell Tags to delete markers with source of operation attribute.
+ * @param miniBatchOp
+ * @throws IOException
+ */
+ private void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp)
+ throws IOException {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!(m instanceof Delete)) {
+ // Ignore if it is not Delete type.
+ continue;
+ }
+ byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
+ if (sourceOpAttr == null) {
+ continue;
+ }
+ Tag sourceOpTag = new Tag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr);
+ List<Cell> updatedCells = new ArrayList<>();
+ for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
+ Cell cell = cellScanner.current();
+ List<Tag> tags = Tag.asList(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+ tags.add(sourceOpTag);
+ Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags));
+ updatedCells.add(updatedCell);
+ }
+ m.getFamilyCellMap().clear();
+ // Clear and add new Cells to the Mutation.
+ for (Cell cell : updatedCells) {
+ Delete d = (Delete) m;
+ d.addDeleteMarker(cell);
+ }
+ }
+ }
+
private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
this.batchMutateContext.set(context);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d7adcf4..cd4eb15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -173,6 +173,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
private boolean isRunningUpgrade;
private LogLevel logLevel;
private Double logSamplingRate;
+ private String sourceOfOperation;
private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
@@ -403,6 +404,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
} else {
GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
}
+ this.sourceOfOperation =
+ this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
}
private static void checkScn(Long scnParam) throws SQLException {
@@ -1360,4 +1363,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
return this.logSamplingRate;
}
+ /**
+ *
+ * @return source of operation
+ */
+ public String getSourceOfOperation() {
+ return sourceOfOperation;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 52cb2b0..893f5d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -363,6 +363,13 @@ public interface QueryServices extends SQLCloseable {
public static final String GUIDE_POSTS_CACHE_FACTORY_CLASS = "phoenix.guide.posts.cache.factory.class";
public static final String PENDING_MUTATIONS_DDL_THROW_ATTRIB = "phoenix.pending.mutations.before.ddl.throw";
+
+ /**
+ * Parameter to indicate the source of operation attribute.
+ * It can include metadata about the customer, service, etc.
+ */
+ String SOURCE_OPERATION_ATTRIB = "phoenix.source.operation";
+
/**
* Get executor service used for parallel scans
*/
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 4d5a424..5810fed 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -370,12 +370,12 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
DeleteCompiler compiler = new DeleteCompiler(stmt, null);
MutationPlan plan = compiler.compile(delete);
assertEquals("T", plan.getQueryPlan().getTableRef().getTable().getTableName().getString());
- assertTrue(plan.getClass().getName().contains("ServerSelectDeleteMutationPlan"));
+ assertEquals(plan.getClass(), DeleteCompiler.ServerSelectDeleteMutationPlan.class);
parser = new SQLParser("DELETE FROM t WHERE v1 = 'foo'");
delete = (DeleteStatement) parser.parseStatement();
plan = compiler.compile(delete);
assertEquals("IDX", plan.getQueryPlan().getTableRef().getTable().getTableName().getString());
- assertTrue(plan.getClass().getName().contains("ClientSelectDeleteMutationPlan"));
+ assertEquals(plan.getClass(), DeleteCompiler.ClientSelectDeleteMutationPlan.class);
}
@Test