You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/12/08 21:30:04 UTC

[phoenix] branch 4.x updated: [PHOENIX-6213] Extend Cell Tags to Delete object to store source of operation.

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new e1ee29d  [PHOENIX-6213] Extend Cell Tags to Delete object to store source of operation.
e1ee29d is described below

commit e1ee29d4e68cefd2639eca88ed27cec0bc714eef
Author: Rushabh <ru...@salesforce.com>
AuthorDate: Mon Dec 7 16:08:43 2020 -0800

    [PHOENIX-6213] Extend Cell Tags to Delete object to store source of operation.
---
 .../java/org/apache/phoenix/end2end/DeleteIT.java  | 178 ++++++++++++++++++++-
 .../org/apache/hadoop/hbase/PhoenixTagType.java    |  29 ++++
 .../java/org/apache/phoenix/call/CallRunner.java   |   1 -
 .../org/apache/phoenix/compile/DeleteCompiler.java |  15 +-
 .../UngroupedAggregateRegionScanner.java           |  13 +-
 .../org/apache/phoenix/execute/MutationState.java  |   9 ++
 .../phoenix/hbase/index/IndexRegionObserver.java   |  44 +++++
 .../org/apache/phoenix/jdbc/PhoenixConnection.java |  10 ++
 .../org/apache/phoenix/query/QueryServices.java    |   7 +
 .../apache/phoenix/compile/QueryOptimizerTest.java |   4 +-
 10 files changed, 299 insertions(+), 11 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index f6f99d9..0c71b1d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -18,12 +18,13 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.printResultSet;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
@@ -32,14 +33,31 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.DeleteCompiler;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -943,6 +961,162 @@ public class DeleteIT extends ParallelStatsDisabledIT {
             }
         }
     }
-}
 
+    /*
+        Tests whether we have cell tags in delete marker for
+        ClientSelectDeleteMutationPlan.
+     */
+    @Test
+    public void testDeleteClientDeleteMutationPlan() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String tagValue = "customer-delete";
+        String delete = "DELETE FROM " + tableName + " WHERE v1 = 'foo'";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        // Add tag "customer-delete" to delete marker.
+        props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+
+        createAndUpsertTable(tableName, indexName, props);
+        // Make sure that the plan creates is of ClientSelectDeleteMutationPlan
+        verifyDeletePlan(delete, DeleteCompiler.ClientSelectDeleteMutationPlan.class, props);
+        executeDelete(delete, props, 1);
+        String startRowKeyForBaseTable = "1";
+        String startRowKeyForIndexTable = "foo";
+        // Make sure that Delete Marker has cell tag for base table
+        // and has no cell tag for index table.
+        checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+        checkTagPresentInDeleteMarker(indexName, startRowKeyForIndexTable, false, null);
+    }
+
+    /*
+        Tests whether we have cell tags in delete marker for
+        ServerSelectDeleteMutationPlan.
+     */
+    @Test
+    public void testDeleteServerDeleteMutationPlan() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        String tagValue = "customer-delete";
+        String delete = "DELETE FROM " + tableName;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+
+        createAndUpsertTable(tableName, indexName, props);
+        // Make sure that the plan creates is of ServerSelectDeleteMutationPlan
+        verifyDeletePlan(delete, DeleteCompiler.ServerSelectDeleteMutationPlan.class, props);
+        executeDelete(delete, props, 2);
+
+        String startRowKeyForBaseTable = "1";
+        String startRowKeyForIndexTable = "foo";
+        // Make sure that Delete Marker has cell tag for base table
+        // and has no cell tag for index table.
+        checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+        checkTagPresentInDeleteMarker(indexName, startRowKeyForIndexTable, false, null);
+    }
 
+    /*
+        Tests whether we have cell tags in delete marker for
+        MultiRowDeleteMutationPlan.
+    */
+    @Test
+    public void testDeleteMultiRowDeleteMutationPlan() throws Exception {
+        String tableName = generateUniqueName();
+        String tagValue = "customer-delete";
+        String delete = "DELETE FROM " + tableName + " WHERE k = 1";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue);
+        // Don't create index table. We will use MultiRowDeleteMutationPlan
+        // if there is no index present for a table.
+        createAndUpsertTable(tableName, null, props);
+        // Make sure that the plan creates is of MultiRowDeleteMutationPlan
+        verifyDeletePlan(delete, DeleteCompiler.MultiRowDeleteMutationPlan.class, props);
+        executeDelete(delete, props, 1);
+        String startRowKeyForBaseTable = "1";
+        // Make sure that Delete Marker has cell tag for base table.
+        // We haven't created index table for this test case.
+        checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue);
+    }
+
+    /*
+        Verify whether plan that we create for delete statement is of planClass
+     */
+    private void verifyDeletePlan(String delete, Class<? extends MutationPlan> planClass,
+            Properties props) throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+            SQLParser parser = new SQLParser(delete);
+            DeleteStatement deleteStmt = (DeleteStatement) parser.parseStatement();
+            DeleteCompiler compiler = new DeleteCompiler(stmt, null);
+            MutationPlan plan = compiler.compile(deleteStmt);
+            assertEquals(plan.getClass(), planClass);
+        }
+    }
+    private void createAndUpsertTable(String tableName, String indexName, Properties props)
+            throws SQLException {
+        String ddl = "CREATE TABLE " + tableName +
+                " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)";
+        try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            try (Statement statement = conn.createStatement()) {
+                statement.execute(ddl);
+                if (indexName != null) {
+                    String indexDdl1 = "CREATE INDEX " + indexName + " ON " + tableName + "(v1,v2)";
+                    statement.execute(indexDdl1);
+                }
+                statement.execute(
+                        "upsert into " + tableName + " values (1, 'foo', 'foo1')");
+                statement.execute(
+                        "upsert into " + tableName + " values (2, 'bar', 'bar1')");
+                conn.commit();
+            }
+        }
+    }
+
+    private void executeDelete(String delete, Properties props, int deleteRowCount)
+            throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            try (Statement statement = conn.createStatement()) {
+                int rs = statement.executeUpdate(delete);
+                assertEquals( deleteRowCount, rs);
+            }
+        }
+    }
+
+    /*
+        Verify whether we have tags present for base table and not present for
+        index tables.
+     */
+    private void checkTagPresentInDeleteMarker(String tableName, String startRowKey,
+            boolean tagPresent, String tagValue) throws IOException {
+        List<Cell> values = new ArrayList<>();
+        TableName table = TableName.valueOf(tableName);
+        // Scan table with specified startRowKey
+        for (HRegion region : getUtility().getHBaseCluster().getRegions(table)) {
+            Scan scan = new Scan();
+            // Make sure to set rawScan to true so that we will get Delete Markers.
+            scan.setRaw(true);
+            scan.setStartRow(Bytes.toBytes(startRowKey));
+            RegionScanner scanner = region.getScanner(scan);
+            scanner.next(values);
+            if (!values.isEmpty()) {
+                break;
+            }
+        }
+        assertFalse("Values shouldn't be empty", values.isEmpty());
+        Cell first = values.get(0);
+        assertTrue("First cell should be delete marker ", CellUtil.isDelete(first));
+        List<Tag> tags = Tag.asList(first.getTagsArray(),
+                first.getTagsOffset(), first.getTagsLength());
+        if (tagPresent) {
+            assertEquals(1, tags.size());
+            Tag sourceOfOperationTag = Tag.getTag(first.getTagsArray(), first.getTagsOffset(),
+                    first.getTagsLength(), PhoenixTagType.SOURCE_OPERATION_TAG_TYPE);
+            Assert.assertNotNull(sourceOfOperationTag);
+            assertEquals(tagValue, Bytes.toString(sourceOfOperationTag.getValue()));
+        } else {
+            assertEquals(0, tags.size());
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java
new file mode 100644
index 0000000..da2064e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/PhoenixTagType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+/**
+    Used to persist the TagType in HBase Cell Tags.
+    All the type present here should be more than @{@link Tag#CUSTOM_TAG_TYPE_RANGE} which is 64.
+ **/
+public final class PhoenixTagType {
+    /**
+     * Indicates the source of operation.
+     */
+    public static final byte SOURCE_OPERATION_TAG_TYPE = (byte) 65;
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
index face677..69807c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
@@ -62,5 +62,4 @@ public class CallRunner {
             }
         }
     }
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ebba909..ca573dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -22,7 +22,6 @@ import static org.apache.phoenix.util.NumberUtil.add;
 import java.io.IOException;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -351,7 +351,7 @@ public class DeleteCompiler {
         return Collections.emptyList();
     }
     
-    private class MultiRowDeleteMutationPlan implements MutationPlan {
+    public class MultiRowDeleteMutationPlan implements MutationPlan {
         private final List<MutationPlan> plans;
         private final MutationPlan firstPlan;
         private final QueryPlan dataPlan;
@@ -621,7 +621,7 @@ public class DeleteCompiler {
             final int offset = table.getBucketNum() == null ? 0 : 1;
             Iterator<PColumn> projectedColsItr = projectedColumns.iterator();
             int i = 0;
-            while(projectedColsItr.hasNext()) {
+            while (projectedColsItr.hasNext()) {
                 final int position = i++;
                 adjustedProjectedColumns.add(new DelegateColumn(projectedColsItr.next()) {
                     @Override
@@ -742,7 +742,7 @@ public class DeleteCompiler {
         }
     }
 
-    private class ServerSelectDeleteMutationPlan implements MutationPlan {
+    public class ServerSelectDeleteMutationPlan implements MutationPlan {
         private final StatementContext context;
         private final QueryPlan dataPlan;
         private final PhoenixConnection connection;
@@ -802,6 +802,11 @@ public class DeleteCompiler {
                     context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
                     context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                     ScanUtil.setClientVersion(context.getScan(), MetaDataProtocol.PHOENIX_VERSION);
+                    String sourceOfDelete = statement.getConnection().getSourceOfOperation();
+                    if (sourceOfDelete != null) {
+                        context.getScan().setAttribute(QueryServices.SOURCE_OPERATION_ATTRIB,
+                                Bytes.toBytes(sourceOfDelete));
+                    }
                 }
                 ResultIterator iterator = aggPlan.iterator();
                 try {
@@ -860,7 +865,7 @@ public class DeleteCompiler {
         }
     }
 
-    private class ClientSelectDeleteMutationPlan implements MutationPlan {
+    public class ClientSelectDeleteMutationPlan implements MutationPlan {
         private final StatementContext context;
         private final TableRef targetTableRef;
         private final QueryPlan dataPlan;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 362d08c..7f4e73c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 
@@ -430,6 +431,12 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
         if (replayMutations != null) {
             delete.setAttribute(REPLAY_WRITES, replayMutations);
         }
+        byte[] sourceOperationBytes =
+                scan.getAttribute(SOURCE_OPERATION_ATTRIB);
+        if (sourceOperationBytes != null) {
+            delete.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOperationBytes);
+        }
+
         mutations.add(delete);
         // force tephra to ignore this deletes
         delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -446,6 +453,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
             delete.deleteColumns(deleteCF,  deleteCQ, ts);
             // force tephra to ignore this deletes
             delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+            // TODO: We need to set SOURCE_OPERATION_ATTRIB here also. The control will come here if
+            // TODO: we drop a column. We also delete metadata from SYSCAT table for the dropped column
+            // TODO: and delete the column. In short, we need to set this attribute for the DM for SYSCAT metadata
+            // TODO: and for data table rows.
             mutations.add(delete);
         }
     }
@@ -469,7 +480,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
                     SortOrder.invert(values[i], 0, values[i], 0,
                             values[i].length);
                 }
-            }else{
+            } else {
                 values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
             }
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 207c2bf..d1f97d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
@@ -646,6 +647,14 @@ public class MutationState implements SQLCloseable {
             if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
                 row.delete();
                 rowMutations = row.toRowMutations();
+                String sourceOfDelete = getConnection().getSourceOfOperation();
+                if (sourceOfDelete != null) {
+                    byte[] sourceOfDeleteBytes = Bytes.toBytes(sourceOfDelete);
+                    // Set the source of operation attribute.
+                    for (Mutation mutation: rowMutations) {
+                        mutation.setAttribute(SOURCE_OPERATION_ATTRIB, sourceOfDeleteBytes);
+                    }
+                }
                 // The DeleteCompiler already generates the deletes for indexes, so no need to do it again
                 rowMutationsPertainingToIndex = Collections.emptyList();
             } else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 93c9766..2a7747d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.PhoenixTagType;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagRewriteCell;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -922,6 +927,9 @@ public class IndexRegionObserver extends BaseRegionObserver {
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
         BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
         setBatchMutateContext(c, context);
+        // Need to add cell tags to Delete Marker before we do any index processing
+        // since we add tags to tables which doesn't have indexes also.
+        setDeleteAttributes(miniBatchOp);
 
         /*
          * Exclusively lock all rows so we get a consistent read
@@ -978,6 +986,42 @@ public class IndexRegionObserver extends BaseRegionObserver {
         }
     }
 
+    /**
+     * Set Cell Tags to delete markers with source of operation attribute.
+     * @param miniBatchOp
+     * @throws IOException
+     */
+    private void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp)
+            throws IOException {
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            if (!(m instanceof  Delete)) {
+                // Ignore if it is not Delete type.
+                continue;
+            }
+            byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB);
+            if (sourceOpAttr == null) {
+                continue;
+            }
+            Tag sourceOpTag = new Tag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr);
+            List<Cell> updatedCells = new ArrayList<>();
+            for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
+                Cell cell = cellScanner.current();
+                List<Tag> tags = Tag.asList(cell.getTagsArray(),
+                        cell.getTagsOffset(), cell.getTagsLength());
+                tags.add(sourceOpTag);
+                Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags));
+                updatedCells.add(updatedCell);
+            }
+            m.getFamilyCellMap().clear();
+            // Clear and add new Cells to the Mutation.
+            for (Cell cell : updatedCells) {
+                Delete d = (Delete) m;
+                d.addDeleteMarker(cell);
+            }
+        }
+    }
+
   private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
       this.batchMutateContext.set(context);
   }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d7adcf4..cd4eb15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -173,6 +173,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     private boolean isRunningUpgrade;
     private LogLevel logLevel;
     private Double logSamplingRate;
+    private String sourceOfOperation;
 
     private Object queueCreationLock = new Object(); // lock for the lazy init path of childConnections structure
     private ConcurrentLinkedQueue<PhoenixConnection> childConnections = null;
@@ -403,6 +404,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         } else {
             GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
         }
+        this.sourceOfOperation =
+                this.services.getProps().get(QueryServices.SOURCE_OPERATION_ATTRIB, null);
     }
 
     private static void checkScn(Long scnParam) throws SQLException {
@@ -1360,4 +1363,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         return this.logSamplingRate;
     }
 
+    /**
+     *
+     * @return source of operation
+     */
+    public String getSourceOfOperation() {
+        return sourceOfOperation;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 52cb2b0..893f5d8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -363,6 +363,13 @@ public interface QueryServices extends SQLCloseable {
     public static final String GUIDE_POSTS_CACHE_FACTORY_CLASS = "phoenix.guide.posts.cache.factory.class";
 
     public static final String PENDING_MUTATIONS_DDL_THROW_ATTRIB = "phoenix.pending.mutations.before.ddl.throw";
+
+    /**
+     * Parameter to indicate the source of operation attribute.
+     * It can include metadata about the customer, service, etc.
+     */
+    String SOURCE_OPERATION_ATTRIB = "phoenix.source.operation";
+
     /**
      * Get executor service used for parallel scans
      */
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index 4d5a424..5810fed 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -370,12 +370,12 @@ public class QueryOptimizerTest extends BaseConnectionlessQueryTest {
         DeleteCompiler compiler = new DeleteCompiler(stmt, null);
         MutationPlan plan = compiler.compile(delete);
         assertEquals("T", plan.getQueryPlan().getTableRef().getTable().getTableName().getString());
-        assertTrue(plan.getClass().getName().contains("ServerSelectDeleteMutationPlan"));
+        assertEquals(plan.getClass(), DeleteCompiler.ServerSelectDeleteMutationPlan.class);
         parser = new SQLParser("DELETE FROM t WHERE v1 = 'foo'");
         delete = (DeleteStatement) parser.parseStatement();
         plan = compiler.compile(delete);
         assertEquals("IDX", plan.getQueryPlan().getTableRef().getTable().getTableName().getString());
-        assertTrue(plan.getClass().getName().contains("ClientSelectDeleteMutationPlan"));
+        assertEquals(plan.getClass(), DeleteCompiler.ClientSelectDeleteMutationPlan.class);
     }
 
     @Test