You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by tk...@apache.org on 2023/02/06 19:10:02 UTC
[phoenix] branch 5.1 updated: PHOENIX-6052 Include syscat time in mutation time (#1557)
This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new 1cb03af181 PHOENIX-6052 Include syscat time in mutation time (#1557)
1cb03af181 is described below
commit 1cb03af1813cb1a4f961c52834744630b687174c
Author: Aman Poonia <am...@gmail.com>
AuthorDate: Tue Feb 7 00:39:56 2023 +0530
PHOENIX-6052 Include syscat time in mutation time (#1557)
---
.../org/apache/phoenix/execute/MutationState.java | 89 ++++++++++++----------
.../phoenix/monitoring/GlobalClientMetrics.java | 2 +
.../org/apache/phoenix/monitoring/MetricType.java | 1 +
3 files changed, 53 insertions(+), 39 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 17a16b11e0..e32f5a6e76 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.execute;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SYSCAT_TIME;
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
@@ -887,52 +888,62 @@ public class MutationState implements SQLCloseable {
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
// so no need to do it again here.
PTable table = tableRef.getTable();
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ try {
- // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
- // get the latest timestamp in order to upsert data with the correct server-side timestamp
- // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
- boolean hitServerForLatestTimestamp =
- table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
- MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
- table.getTableName().getString(), hitServerForLatestTimestamp);
- PTable resolvedTable = result.getTable();
- if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table
- .getTableName().getString()); }
- // Always update tableRef table as the one we've cached may be out of date since when we executed
- // the UPSERT VALUES call and updated in the cache before this.
- tableRef.setTable(resolvedTable);
- List<PTable> indexes = resolvedTable.getIndexes();
- for (PTable idxTtable : indexes) {
- // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
- // our failure mode is block writes on index failure.
- if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE)
- && idxTtable.getIndexDisableTimestamp() > 0) { throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString())
- .setTableName(table.getTableName().getString()).build().buildException(); }
- }
- long timestamp = result.getMutationTime();
- if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
- serverTimeStamp = timestamp;
- if (result.wasUpdated()) {
- List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
- for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
- RowMutationState valueEntry = rowEntry.getValue();
- if (valueEntry != null) {
- Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
- if (colValues != PRow.DELETE_MARKER) {
- for (PColumn column : colValues.keySet()) {
- if (!column.isDynamic()) columns.add(column);
+ // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
+ // get the latest timestamp in order to upsert data with the correct server-side timestamp
+ // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
+ boolean hitServerForLatestTimestamp =
+ table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
+ MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
+ table.getTableName().getString(), hitServerForLatestTimestamp);
+ PTable resolvedTable = result.getTable();
+ if (resolvedTable == null) {
+ throw new TableNotFoundException(table.getSchemaName().getString(), table
+ .getTableName().getString());
+ }
+ // Always update tableRef table as the one we've cached may be out of date since when we executed
+ // the UPSERT VALUES call and updated in the cache before this.
+ tableRef.setTable(resolvedTable);
+ List<PTable> indexes = resolvedTable.getIndexes();
+ for (PTable idxTtable : indexes) {
+ // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
+ // our failure mode is block writes on index failure.
+ if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE)
+ && idxTtable.getIndexDisableTimestamp() > 0) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString())
+ .setTableName(table.getTableName().getString()).build().buildException();
+ }
+ }
+ long timestamp = result.getMutationTime();
+ if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
+ serverTimeStamp = timestamp;
+ if (result.wasUpdated()) {
+ List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+ for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+ RowMutationState valueEntry = rowEntry.getValue();
+ if (valueEntry != null) {
+ Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+ if (colValues != PRow.DELETE_MARKER) {
+ for (PColumn column : colValues.keySet()) {
+ if (!column.isDynamic()) columns.add(column);
+ }
}
}
}
- }
- for (PColumn column : columns) {
- if (column != null) {
- resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
- column.getName().getString());
+ for (PColumn column : columns) {
+ if (column != null) {
+ resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
+ column.getName().getString());
+ }
}
}
}
+ } finally {
+ long endTime = EnvironmentEdgeManager.currentTimeMillis();
+ GLOBAL_MUTATION_SYSCAT_TIME.update(endTime - startTime);
}
return serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 4ed025fe40..ad19e05aa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -25,6 +25,7 @@ import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_SYSCAT_TIME;
import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS;
import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
@@ -85,6 +86,7 @@ public enum GlobalClientMetrics {
GLOBAL_MUTATION_COMMIT_TIME(MUTATION_COMMIT_TIME),
GLOBAL_MUTATION_BATCH_FAILED_COUNT(MUTATION_BATCH_FAILED_SIZE),
GLOBAL_MUTATION_INDEX_COMMIT_FAILURE_COUNT(INDEX_COMMIT_FAILURE_SIZE),
+ GLOBAL_MUTATION_SYSCAT_TIME(MUTATION_SYSCAT_TIME),
GLOBAL_QUERY_TIME(QUERY_TIME),
GLOBAL_NUM_PARALLEL_SCANS(NUM_PARALLEL_SCANS),
GLOBAL_SCAN_BYTES(SCAN_BYTES),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index fa28f4bdb7..75907761e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -32,6 +32,7 @@ public enum MetricType {
MUTATION_BATCH_SIZE("ms", "Number of mutations in the batch",LogLevel.OFF, PLong.INSTANCE),
MUTATION_BYTES("mb", "Size of mutations in bytes",LogLevel.OFF, PLong.INSTANCE),
MUTATION_COMMIT_TIME("mt", "Time it took to commit a batch of mutations",LogLevel.OFF, PLong.INSTANCE),
+ MUTATION_SYSCAT_TIME("msyst", "Time it spent in syscat before mutation", LogLevel.OFF, PLong.INSTANCE),
MUTATION_BATCH_FAILED_SIZE("mfs", "Number of mutations that failed to be committed",LogLevel.OFF, PLong.INSTANCE),
MUTATION_SQL_COUNTER("msc", "Counter for number of mutation sql statements",LogLevel.OFF, PLong.INSTANCE),
INDEX_COMMIT_FAILURE_SIZE("p3s", "Number of mutations that failed in phase 3", LogLevel.OFF, PLong.INSTANCE),