You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/02/06 19:10:02 UTC

[phoenix] branch 5.1 updated: PHOENIX-6052 Include syscat time in mutation time (#1557)

This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 1cb03af181 PHOENIX-6052 Include syscat time in mutation time (#1557)
1cb03af181 is described below

commit 1cb03af1813cb1a4f961c52834744630b687174c
Author: Aman Poonia <am...@gmail.com>
AuthorDate: Tue Feb 7 00:39:56 2023 +0530

    PHOENIX-6052 Include syscat time in mutation time (#1557)
---
 .../org/apache/phoenix/execute/MutationState.java  | 89 ++++++++++++----------
 .../phoenix/monitoring/GlobalClientMetrics.java    |  2 +
 .../org/apache/phoenix/monitoring/MetricType.java  |  1 +
 3 files changed, 53 insertions(+), 39 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 17a16b11e0..e32f5a6e76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SYSCAT_TIME;
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
 import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
@@ -887,52 +888,62 @@ public class MutationState implements SQLCloseable {
         // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
         // so no need to do it again here.
         PTable table = tableRef.getTable();
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        try {
 
-        // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
-        // get the latest timestamp in order to upsert data with the correct server-side timestamp
-        // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
-        boolean hitServerForLatestTimestamp =
-                table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
-        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
-                table.getTableName().getString(), hitServerForLatestTimestamp);
-        PTable resolvedTable = result.getTable();
-        if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table
-                .getTableName().getString()); }
-        // Always update tableRef table as the one we've cached may be out of date since when we executed
-        // the UPSERT VALUES call and updated in the cache before this.
-        tableRef.setTable(resolvedTable);
-        List<PTable> indexes = resolvedTable.getIndexes();
-        for (PTable idxTtable : indexes) {
-            // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
-            // our failure mode is block writes on index failure.
-            if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE)
-                    && idxTtable.getIndexDisableTimestamp() > 0) { throw new SQLExceptionInfo.Builder(
-                    SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString())
-                    .setTableName(table.getTableName().getString()).build().buildException(); }
-        }
-        long timestamp = result.getMutationTime();
-        if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-            serverTimeStamp = timestamp;
-            if (result.wasUpdated()) {
-                List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
-                for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
-                    RowMutationState valueEntry = rowEntry.getValue();
-                    if (valueEntry != null) {
-                        Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                        if (colValues != PRow.DELETE_MARKER) {
-                            for (PColumn column : colValues.keySet()) {
-                                if (!column.isDynamic()) columns.add(column);
+            // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
+            // get the latest timestamp in order to upsert data with the correct server-side timestamp
+            // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
+            boolean hitServerForLatestTimestamp =
+                    table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
+            MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
+                    table.getTableName().getString(), hitServerForLatestTimestamp);
+            PTable resolvedTable = result.getTable();
+            if (resolvedTable == null) {
+                throw new TableNotFoundException(table.getSchemaName().getString(), table
+                        .getTableName().getString());
+            }
+            // Always update tableRef table as the one we've cached may be out of date since when we executed
+            // the UPSERT VALUES call and updated in the cache before this.
+            tableRef.setTable(resolvedTable);
+            List<PTable> indexes = resolvedTable.getIndexes();
+            for (PTable idxTtable : indexes) {
+                // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
+                // our failure mode is block writes on index failure.
+                if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE)
+                        && idxTtable.getIndexDisableTimestamp() > 0) {
+                    throw new SQLExceptionInfo.Builder(
+                            SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString())
+                            .setTableName(table.getTableName().getString()).build().buildException();
+                }
+            }
+            long timestamp = result.getMutationTime();
+            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+                serverTimeStamp = timestamp;
+                if (result.wasUpdated()) {
+                    List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+                    for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+                        RowMutationState valueEntry = rowEntry.getValue();
+                        if (valueEntry != null) {
+                            Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                            if (colValues != PRow.DELETE_MARKER) {
+                                for (PColumn column : colValues.keySet()) {
+                                    if (!column.isDynamic()) columns.add(column);
+                                }
                             }
                         }
                     }
-                }
-                for (PColumn column : columns) {
-                    if (column != null) {
-                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
-                                column.getName().getString());
+                    for (PColumn column : columns) {
+                        if (column != null) {
+                            resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
+                                    column.getName().getString());
+                        }
                     }
                 }
             }
+        } finally {
+            long endTime = EnvironmentEdgeManager.currentTimeMillis();
+            GLOBAL_MUTATION_SYSCAT_TIME.update(endTime - startTime); 
         }
         return serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 4ed025fe40..ad19e05aa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SYSCAT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
@@ -85,6 +86,7 @@ public enum GlobalClientMetrics {
     GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
     GLOBAL_MUTATION_BATCH_FAILED_COUNT(MUTATION_BATCH_FAILED_SIZE),
     GLOBAL_MUTATION_INDEX_COMMIT_FAILURE_COUNT(INDEX_COMMIT_FAILURE_SIZE),
+    GLOBAL_MUTATION_SYSCAT_TIME(MUTATION_SYSCAT_TIME),
     GLOBAL_QUERY_TIME(QUERY_TIME),
     GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
     GLOBAL_SCAN_BYTES(SCAN_BYTES),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index fa28f4bdb7..75907761e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -32,6 +32,7 @@ public enum MetricType {
     MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE),
     MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE),
     MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE),
+    MUTATION_SYSCAT_TIME("msyst", "Time it spent in syscat before mutation", LogLevel.OFF, PLong.INSTANCE),
     MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE),
     MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE),
     INDEX_COMMIT_FAILURE_SIZE("p3s", "Number of mutations that failed in phase 3", LogLevel.OFF, PLong.INSTANCE),