You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2023/11/29 17:54:35 UTC
(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)
This is an automated email from the ASF dual-hosted git repository.
shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
new 4499c2965a PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)
4499c2965a is described below
commit 4499c2965af9e07491bfab104b94d8be697d0e35
Author: palash <pa...@gmail.com>
AuthorDate: Wed Nov 29 23:24:30 2023 +0530
PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)
---
.../org/apache/phoenix/execute/MutationState.java | 66 +++-
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 135 +------
.../phoenix/monitoring/GlobalClientMetrics.java | 4 +-
.../org/apache/phoenix/monitoring/MetricType.java | 3 +
.../phoenix/util/ValidateLastDDLTimestampUtil.java | 211 ++++++++++
.../phoenix/cache/ServerMetadataCacheTest.java | 440 ++++++++++++++++++++-
6 files changed, 708 insertions(+), 151 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1cf23d7e0f..1612fe36a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -68,6 +68,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -122,6 +123,7 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
import org.apache.phoenix.util.WALAnnotationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,6 +161,7 @@ public class MutationState implements SQLCloseable {
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
+ private boolean validateLastDdlTimestamp;
private Map<TableRef, List<MultiRowMutationState>> txMutations = Collections.emptyMap();
private PhoenixTransactionContext phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
@@ -221,6 +224,8 @@ public class MutationState implements SQLCloseable {
boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+ this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil
+ .getValidateLastDdlTimestampEnabled(this.connection);
if (subTask) {
// this code path is only used while running child scans, we can't pass the txContext to child scans
// as it is not thread safe, so we use the tx member variable
@@ -946,26 +951,33 @@ public class MutationState implements SQLCloseable {
.setTableName(table.getTableName().getString()).build().buildException(); }
}
long timestamp = result.getMutationTime();
- if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
- serverTimeStamp = timestamp;
- if (result.wasUpdated()) {
- List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
- for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
- RowMutationState valueEntry = rowEntry.getValue();
- if (valueEntry != null) {
- Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
- if (colValues != PRow.DELETE_MARKER) {
- for (PColumn column : colValues.keySet()) {
- if (!column.isDynamic()) columns.add(column);
+ serverTimeStamp = timestamp;
+
+ /* when last_ddl_timestamp validation is enabled,
+ we don't know if this table's cache result was force updated
+ during the validation, so always validate columns */
+ if ((timestamp != QueryConstants.UNSET_TIMESTAMP && result.wasUpdated())
+ || this.validateLastDdlTimestamp) {
+ List<PColumn> columns
+ = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+ for (Map.Entry<ImmutableBytesPtr, RowMutationState>
+ rowEntry : rowKeyToColumnMap.entrySet()) {
+ RowMutationState valueEntry = rowEntry.getValue();
+ if (valueEntry != null) {
+ Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+ if (colValues != PRow.DELETE_MARKER) {
+ for (PColumn column : colValues.keySet()) {
+ if (!column.isDynamic()) {
+ columns.add(column);
}
}
}
}
- for (PColumn column : columns) {
- if (column != null) {
- resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
- column.getName().getString());
- }
+ }
+ for (PColumn column : columns) {
+ if (column != null) {
+ resolvedTable.getColumnFamily(column.getFamilyName().getString())
+ .getPColumnForColumnName(column.getName().getString());
}
}
}
@@ -1201,6 +1213,28 @@ public class MutationState implements SQLCloseable {
commitBatches = createCommitBatches(tableRefIterator);
}
+ //if enabled, validate last ddl timestamps for all tables in the mutationsMap
+ //for now, force update client cache for all tables if StaleMetadataCacheException is seen
+ if (this.validateLastDdlTimestamp) {
+ List<TableRef> tableRefs = new ArrayList<>(this.mutationsMap.keySet());
+ try {
+ ValidateLastDDLTimestampUtil.validateLastDDLTimestamp(
+ connection, tableRefs, true, true);
+ } catch (StaleMetadataCacheException e) {
+ GlobalClientMetrics
+ .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.increment();
+ MetaDataClient mc = new MetaDataClient(connection);
+ PName tenantId = connection.getTenantId();
+ LOGGER.debug("Force updating client metadata cache for {}",
+ ValidateLastDDLTimestampUtil.getInfoString(tenantId, tableRefs));
+ for (TableRef tableRef : tableRefs) {
+ String schemaName = tableRef.getTable().getSchemaName().toString();
+ String tableName = tableRef.getTable().getTableName().toString();
+ mc.updateCache(tenantId, schemaName, tableName, true);
+ }
+ }
+ }
+
for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches) {
long [] serverTimestamps = validateServerTimestamps ? validateAll(commitBatch) : null;
sendBatch(commitBatch, serverTimestamps, sendAll);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d68e981d10..41981c41c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -64,20 +64,14 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.call.CallRunner;
import org.apache.phoenix.compile.BaseMutationPlan;
@@ -107,8 +101,6 @@ import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.compile.TraceQueryPlan;
import org.apache.phoenix.compile.UpsertCompiler;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
-import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.exception.StaleMetadataCacheException;
@@ -128,6 +120,7 @@ import org.apache.phoenix.log.QueryLogInfo;
import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.log.QueryLoggerUtil;
import org.apache.phoenix.log.QueryStatus;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
@@ -206,12 +199,10 @@ import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.Sequence;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -233,6 +224,7 @@ import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -308,7 +300,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
public PhoenixStatement(PhoenixConnection connection) {
this.connection = connection;
this.queryTimeoutMillis = getDefaultQueryTimeoutMillis();
- this.validateLastDdlTimestamp = getValidateLastDdlTimestampEnabled();
+ this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil
+ .getValidateLastDdlTimestampEnabled(this.connection);
}
/**
@@ -320,12 +313,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
}
- private boolean getValidateLastDdlTimestampEnabled() {
- return connection.getQueryServices().getProps()
- .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
- QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
- }
-
protected List<PhoenixResultSet> getResultSets() {
return resultSets;
}
@@ -349,109 +336,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
return executeQuery(stmt, true, queryLogger, noCommit, this.validateLastDdlTimestamp);
}
- private String getInfoString(TableRef tableRef) {
- return String.format("Tenant: %s, Schema: %s, Table: %s",
- this.connection.getTenantId(),
- tableRef.getTable().getSchemaName(),
- tableRef.getTable().getTableName());
- }
-
- private void setLastDDLTimestampRequestParameters(
- RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder, PTable pTable) {
- byte[] tenantIDBytes = this.connection.getTenantId() == null
- ? HConstants.EMPTY_BYTE_ARRAY
- : this.connection.getTenantId().getBytes();
- byte[] schemaBytes = pTable.getSchemaName() == null
- ? HConstants.EMPTY_BYTE_ARRAY
- : pTable.getSchemaName().getBytes();
- builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
- builder.setSchemaName(ByteStringer.wrap(schemaBytes));
- builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes()));
- builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp());
- }
- /**
- * Build a request for the validateLastDDLTimestamp RPC.
- * @param tableRef
- * @return ValidateLastDDLTimestampRequest for the table in tableRef
- */
- private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
- getValidateDDLTimestampRequest(TableRef tableRef) throws TableNotFoundException {
- RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
- = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
- RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder
- = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
-
- //when querying an index, we need to validate its parent table in case the index was dropped
- if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
- PTableKey key = new PTableKey(this.connection.getTenantId(),
- tableRef.getTable().getParentName().getString());
- PTable parentTable = this.connection.getTable(key);
- setLastDDLTimestampRequestParameters(innerBuilder, parentTable);
- requestBuilder.addLastDDLTimestampRequests(innerBuilder);
- }
-
- innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
- setLastDDLTimestampRequestParameters(innerBuilder, tableRef.getTable());
- requestBuilder.addLastDDLTimestampRequests(innerBuilder);
-
- //when querying a view, we need to validate last ddl timestamps for all its ancestors
- if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
- PTable pTable = tableRef.getTable();
- while (pTable.getParentName() != null) {
- PTableKey key = new PTableKey(this.connection.getTenantId(),
- pTable.getParentName().getString());
- PTable parentTable = this.connection.getTable(key);
- innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
- setLastDDLTimestampRequestParameters(innerBuilder, parentTable);
- requestBuilder.addLastDDLTimestampRequests(innerBuilder);
- pTable = parentTable;
- }
- }
- return requestBuilder.build();
- }
-
- /**
- * Verifies that table metadata in client cache is up-to-date with server.
- * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp.
- * Retry once if there was an error performing the RPC, otherwise throw the Exception.
- * @param tableRef
- * @throws SQLException
- */
- private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) throws SQLException {
-
- String infoString = getInfoString(tableRef);
- try (Admin admin = this.connection.getQueryServices().getAdmin()) {
- // get all live region servers
- List<ServerName> regionServers
- = this.connection.getQueryServices().getLiveRegionServers();
- // pick one at random
- ServerName regionServer
- = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
-
- LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}",
- infoString, regionServer);
-
- // RPC
- CoprocessorRpcChannel channel = admin.coprocessorService(regionServer);
- PhoenixRegionServerEndpoint.BlockingInterface service
- = PhoenixRegionServerEndpoint.newBlockingStub(channel);
- service.validateLastDDLTimestamp(null, getValidateDDLTimestampRequest(tableRef));
- } catch (Exception e) {
- SQLException parsedException = ServerUtil.parseServerException(e);
- if (parsedException instanceof StaleMetadataCacheException) {
- throw parsedException;
- }
- //retry once for any exceptions other than StaleMetadataCacheException
- LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException);
- if (doRetry) {
- // update the list of live region servers
- this.connection.getQueryServices().refreshLiveRegionServers();
- validateLastDDLTimestamp(tableRef, false);
- return;
- }
- throw parsedException;
- }
- }
private PhoenixResultSet executeQuery(final CompilableStatement stmt,
final boolean doRetryOnMetaNotFoundError,
@@ -508,7 +392,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
//verify metadata for the table/view/index in the query plan
//plan.getTableRef can be null in some cases like EXPLAIN <query>
if (shouldValidateLastDdlTimestamp && plan.getTableRef() != null) {
- validateLastDDLTimestamp(plan.getTableRef(), true);
+ ValidateLastDDLTimestampUtil.validateLastDDLTimestamp(
+ connection, Arrays.asList(plan.getTableRef()), false, true);
}
// this will create its own trace internally, so we don't wrap this
@@ -564,13 +449,17 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
}
throw e;
} catch (StaleMetadataCacheException e) {
+ GlobalClientMetrics
+ .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER
+ .increment();
updateMetrics = false;
PTable pTable = lastQueryPlan.getTableRef().getTable();
- LOGGER.debug("Force updating client metadata cache for {}",
- getInfoString(getLastQueryPlan().getTableRef()));
String schemaN = pTable.getSchemaName().toString();
String tableN = pTable.getTableName().toString();
PName tenantId = connection.getTenantId();
+ LOGGER.debug("Force updating client metadata cache for {}",
+ ValidateLastDDLTimestampUtil.getInfoString(tenantId,
+ Arrays.asList(getLastQueryPlan().getTableRef())));
// if the index metadata was stale, we will update the client cache
// for the parent table, which will also add the new index metadata
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 70529dc36c..8763a7c5ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -39,6 +39,7 @@ import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.STALE_METADATA_CACHE_EXCEPTION_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
@@ -159,7 +160,8 @@ public enum GlobalClientMetrics {
GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
- GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER);
+ GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER),
+ GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER(STALE_METADATA_CACHE_EXCEPTION_COUNTER);
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class);
private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index acdba9e551..b7b0c4b562 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -150,6 +150,9 @@ public enum MetricType {
CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE),
+ STALE_METADATA_CACHE_EXCEPTION_COUNTER("smce",
+ "Number of StaleMetadataCacheException encountered.",
+ LogLevel.DEBUG, PLong.INSTANCE),
// hbase metrics
COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
new file mode 100644
index 0000000000..47159fccca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+ private ValidateLastDDLTimestampUtil() {}
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(ValidateLastDDLTimestampUtil.class);
+
+ public static String getInfoString(PName tenantId, List<TableRef> tableRefs) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("Tenant: %s, ", tenantId));
+ for (TableRef tableRef : tableRefs) {
+ sb.append(String.format("{Schema: %s, Table: %s},",
+ tableRef.getTable().getSchemaName(),
+ tableRef.getTable().getTableName()));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Get whether last ddl timestamp validation is enabled on the connection
+ * @param connection
+ * @return true if it is enabled, false otherwise
+ */
+ public static boolean getValidateLastDdlTimestampEnabled(PhoenixConnection connection) {
+ return connection.getQueryServices().getProps()
+ .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
+ QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
+ }
+
+ /**
+ * Verifies that table metadata for given tables is up-to-date in client cache with server.
+ * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp.
+ * Retry once if there was an error performing the RPC, otherwise throw the Exception.
+ * @param tableRefs
+ * @param isWritePath
+ * @param doRetry
+ * @throws SQLException
+ */
+ public static void validateLastDDLTimestamp(
+ PhoenixConnection conn, List<TableRef> tableRefs, boolean isWritePath, boolean doRetry)
+ throws SQLException {
+
+ String infoString = getInfoString(conn.getTenantId(), tableRefs);
+ try (Admin admin = conn.getQueryServices().getAdmin()) {
+ // get all live region servers
+ List<ServerName> regionServers
+ = conn.getQueryServices().getLiveRegionServers();
+ // pick one at random
+ ServerName regionServer
+ = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+ LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}",
+ infoString, regionServer);
+
+ // RPC
+ CoprocessorRpcChannel channel = admin.coprocessorService(regionServer);
+ PhoenixRegionServerEndpoint.BlockingInterface service
+ = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+ RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request
+ = getValidateDDLTimestampRequest(conn, tableRefs, isWritePath);
+ service.validateLastDDLTimestamp(null, request);
+ } catch (Exception e) {
+ SQLException parsedException = ServerUtil.parseServerException(e);
+ if (parsedException instanceof StaleMetadataCacheException) {
+ throw parsedException;
+ }
+ //retry once for any exceptions other than StaleMetadataCacheException
+ LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException);
+ if (doRetry) {
+ // update the list of live region servers
+ conn.getQueryServices().refreshLiveRegionServers();
+ validateLastDDLTimestamp(conn, tableRefs, isWritePath, false);
+ return;
+ }
+ throw parsedException;
+ }
+ }
+
+ /**
+ * Build a request for the validateLastDDLTimestamp RPC for the given tables.
+ * 1. For a view, we need to add all its ancestors to the request
+ * in case something changed in the hierarchy.
+ * 2. For an index, we need to add its parent table to the request
+ * in case the index was dropped.
+ * 3. On the write path, we need to add all indexes of a table/view
+ * in case index state was changed.
+ * @param conn
+ * @param tableRefs
+ * @param isWritePath
+ * @return ValidateLastDDLTimestampRequest for the table in tableRef
+ */
+ private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+ getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> tableRefs,
+ boolean isWritePath) throws TableNotFoundException {
+
+ RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
+ = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+ RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder;
+
+ for (TableRef tableRef : tableRefs) {
+
+ //when querying an index, we need to validate its parent table
+ //in case the index was dropped
+ if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
+ innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+ PTableKey key = new PTableKey(conn.getTenantId(),
+ tableRef.getTable().getParentName().getString());
+ PTable parentTable = conn.getTable(key);
+ setLastDDLTimestampRequestParameters(innerBuilder, conn.getTenantId(), parentTable);
+ requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+ }
+
+ // add the tableRef to the request
+ innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+ setLastDDLTimestampRequestParameters(
+ innerBuilder, conn.getTenantId(), tableRef.getTable());
+ requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+
+ //when querying a view, we need to validate last ddl timestamps for all its ancestors
+ if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
+ PTable pTable = tableRef.getTable();
+ while (pTable.getParentName() != null) {
+ PTableKey key = new PTableKey(conn.getTenantId(),
+ pTable.getParentName().getString());
+ PTable parentTable = conn.getTable(key);
+ innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+ setLastDDLTimestampRequestParameters(
+ innerBuilder, conn.getTenantId(), parentTable);
+ requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+ pTable = parentTable;
+ }
+ }
+
+ //on the write path, we need to validate all indexes of a table/view
+ //in case index state was changed
+ if (isWritePath) {
+ for (PTable idxPTable : tableRef.getTable().getIndexes()) {
+ innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+ setLastDDLTimestampRequestParameters(
+ innerBuilder, conn.getTenantId(), idxPTable);
+ requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+ }
+ }
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * For the given PTable, set the attributes on the LastDDLTimestampRequest.
+ */
+ private static void setLastDDLTimestampRequestParameters(
+ RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder,
+ PName tenantId, PTable pTable) {
+ byte[] tenantIDBytes = tenantId == null
+ ? HConstants.EMPTY_BYTE_ARRAY
+ : tenantId.getBytes();
+ byte[] schemaBytes = pTable.getSchemaName() == null
+ ? HConstants.EMPTY_BYTE_ARRAY
+ : pTable.getSchemaName().getBytes();
+ builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
+ builder.setSchemaName(ByteStringer.wrap(schemaBytes));
+ builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes()));
+ builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 2a618cdcce..6ae19ad144 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ConnectionProperty;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -36,6 +39,7 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -77,6 +81,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
+ @Before
+ public void resetMetrics() {
+ GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().reset();
+ }
+
@After
public void resetMetadataCache() {
ServerMetadataCache.resetCache();
@@ -545,7 +554,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// create table with UCF=never and upsert data using client-1
createTable(conn1, tableName, NEVER);
- upsert(conn1, tableName);
+ upsert(conn1, tableName, true);
// select query from client-2 works to populate client side metadata cache
// there should be 1 update to the client cache
@@ -562,16 +571,22 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// select query from client-2 with old ddl timestamp works
// there should be one update to the client cache
+ //verify client got a StaleMetadataCacheException
query(conn2, tableName);
expectedNumCacheUpdates = 1;
Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
.addTable(any(PTable.class), anyLong());
+ Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+ 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
// select query from client-2 with latest ddl timestamp works
// there should be no more updates to client cache
+ //verify client did not get another StaleMetadataCacheException
query(conn2, tableName);
Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
.addTable(any(PTable.class), anyLong());
+ Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+ 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
}
}
@@ -594,7 +609,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// create table and upsert using client-1
createTable(conn1, tableName, NEVER);
- upsert(conn1, tableName);
+ upsert(conn1, tableName, true);
// Instrument ServerMetadataCache to throw a SQLException once
cache = ServerMetadataCache.getInstance(config);
@@ -632,7 +647,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// create table and upsert using client-1
createTable(conn1, tableName, NEVER);
- upsert(conn1, tableName);
+ upsert(conn1, tableName, true);
// query using client-2 to populate cache
query(conn2, tableName);
@@ -682,7 +697,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// create table and upsert using client-1
createTable(conn1, tableName, NEVER);
- upsert(conn1, tableName);
+ upsert(conn1, tableName, true);
// Instrument ServerMetadataCache to throw a SQLException twice
cache = ServerMetadataCache.getInstance(config);
@@ -722,7 +737,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
// create table using client-1
createTable(conn1, tableName, NEVER);
- upsert(conn1, tableName);
+ upsert(conn1, tableName, true);
// create 2 level of views using client-1
String view1 = generateUniqueName();
@@ -813,7 +828,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
.addTable(any(PTable.class), anyLong());
//client-1 updates index property
- alterIndexChangeStateToRebuild(conn1, tableName, indexName);
+ alterIndexChangeState(conn1, tableName, indexName, " REBUILD");
//client-2's query using the index should work
PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class);
@@ -911,6 +926,382 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
}
}
+ /**
+ * Test the case when a client upserts into multiple tables before calling commit.
+ * Verify that last ddl timestamp was validated for all involved tables only once.
+ */
+ @Test
+ public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName1 = generateUniqueName();
+ String tableName2 = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ //client-1 creates 2 tables
+ createTable(conn1, tableName1, NEVER);
+ createTable(conn1, tableName2, NEVER);
+
+ //client-2 populates its cache, 1 getTable call for each table
+ query(conn2, tableName1);
+ query(conn2, tableName2);
+
+ //client-1 alters one of the tables
+ alterTableAddColumn(conn1, tableName2, "col3");
+
+ //client-2 upserts multiple rows to both tables before calling commit
+ //verify the table metadata was fetched for each table
+ multiTableUpsert(conn2, tableName1, tableName2);
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName1)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName2)),
+ anyLong(), anyLong());
+ }
+ }
+
+ /**
+ * Test upserts into a multi-level view hierarchy.
+ */
+ @Test
+ public void testUpsertViewWithOldDDLTimestamp() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ String viewName1 = generateUniqueName();
+ String viewName2 = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ //client-1 creates a table and views
+ createTable(conn1, tableName, NEVER);
+ createView(conn1, tableName, viewName1);
+ createView(conn1, viewName1, viewName2);
+
+ //client-2 populates its cache, 1 getTable RPC each for table, view1, view2
+ query(conn2, viewName2);
+ Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+ anyLong(), anyLong());
+
+ //client-1 alters first level view
+ alterViewAddColumn(conn1, viewName1, "col3");
+
+ //client-2 upserts into second level view
+ //verify there was a getTable RPC for the view and all its ancestors
+ //verify that the client got a StaleMetadataCacheException
+ upsert(conn2, viewName2, true);
+
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+ anyLong(), anyLong());
+ Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+ 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
+ //client-2 upserts into first level view
+ //verify no getTable RPCs
+ //verify that the client did not get a StaleMetadataCacheException
+ upsert(conn2, viewName1, true);
+
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+ anyLong(), anyLong());
+ Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+ any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+ anyLong(), anyLong());
+ Assert.assertEquals("Client should not have encountered another StaleMetadataCacheException",
+ 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
+ }
+ }
+
+ /**
+ * Test that upserts into a table which was dropped throws a TableNotFoundException.
+ */
+ @Test
+ public void testUpsertDroppedTable() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables and executes upserts
+ createTable(conn1, tableName, NEVER);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 drops the table
+ conn2.createStatement().execute("DROP TABLE " + tableName);
+
+ //client-1 commits
+ conn1.commit();
+ Assert.fail("Commit should have failed with TableNotFoundException");
+ }
+ catch (Exception e) {
+ Assert.assertTrue("TableNotFoundException was not thrown when table was dropped concurrently with upserts.", e instanceof TableNotFoundException);
+ }
+ }
+
+ /**
+ * Client-1 creates a table and executes some upserts.
+ * Client-2 drops a column for which client-1 had executed upserts.
+ * Client-1 calls commit. Verify that client-1 gets ColumnNotFoundException
+ */
+ @Test
+ public void testUpsertDroppedTableColumn() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables and executes upserts
+ createTable(conn1, tableName, NEVER);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 drops a column
+ alterTableDropColumn(conn2, tableName, "v1");
+
+ //client-1 commits
+ conn1.commit();
+ Assert.fail("Commit should have failed with ColumnNotFoundException");
+ }
+ catch (Exception e) {
+ Assert.assertTrue("ColumnNotFoundException was not thrown when column was dropped concurrently with upserts.", e instanceof ColumnNotFoundException);
+ }
+ }
+
+ /**
+ * Client-1 creates a table and executes some upserts.
+ * Client-2 adds a column to the table.
+ * Client-1 calls commit. Verify that client-1 does not get any errors.
+ */
+ @Test
+ public void testUpsertAddTableColumn() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables and executes upserts
+ createTable(conn1, tableName, NEVER);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 adds a column
+ alterTableAddColumn(conn2, tableName, "v5");
+
+ //client-1 commits
+ conn1.commit();
+ }
+ }
+
+ /**
+ * Client-1 creates a table and executes some upserts.
+ * Client-2 creates an index on the table.
+ * Client-1 calls commit. Verify that index mutations were correctly generated
+ */
+ @Test
+ public void testConcurrentUpsertIndexCreation() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables and executes upserts
+ createTable(conn1, tableName, NEVER);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 creates an index
+ createIndex(conn2, tableName, indexName, "v1");
+
+ //client-1 commits
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ conn1.commit();
+
+ //verify index rows
+ int tableCount, indexCount;
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ rs.next();
+ tableCount = rs.getInt(1);
+
+ rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName);
+ rs.next();
+ indexCount = rs.getInt(1);
+
+ Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount);
+ }
+ }
+
+ /**
+ * Client-1 creates a table, index and executes some upserts.
+ * Client-2 drops the index on the table.
+ * Client-1 calls commit. Verify that client-1 does not see any errors
+ */
+ @Test
+ public void testConcurrentUpsertDropIndex() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables, index and executes upserts
+ createTable(conn1, tableName, NEVER);
+ createIndex(conn1, tableName, indexName, "v1");
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 drops the index
+ dropIndex(conn2, tableName, indexName);
+
+ //client-1 commits
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ conn1.commit();
+ }
+ }
+ /**
+ * Client-1 creates a table, index in disabled state and executes some upserts.
+ * Client-2 marks the index as Rebuild.
+ * Client-1 calls commit. Verify that index mutations were correctly generated
+ */
+ @Test
+ public void testConcurrentUpsertIndexStateChange() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ // client-1 creates tables and executes upserts
+ createTable(conn1, tableName, NEVER);
+ createIndex(conn1, tableName, indexName, "v1");
+ alterIndexChangeState(conn1, tableName, indexName, " DISABLE");
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+
+ // client-2 creates an index
+ alterIndexChangeState(conn2, tableName, indexName, " REBUILD");
+
+ //client-1 commits
+ upsert(conn1, tableName, false);
+ upsert(conn1, tableName, false);
+ conn1.commit();
+
+ //verify index rows
+ int tableCount, indexCount;
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ rs.next();
+ tableCount = rs.getInt(1);
+
+ rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName);
+ rs.next();
+ indexCount = rs.getInt(1);
+
+ Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount);
+ }
+ }
+
+ /**
+ * Test that upserts into a view whose parent was dropped throws a TableNotFoundException.
+ */
+ @Test
+ public void testConcurrentUpsertDropView() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+ String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+ String tableName = generateUniqueName();
+ String viewName1 = generateUniqueName();
+ String viewName2 = generateUniqueName();
+ ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+ ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+ try (Connection conn1 = spyCqs1.connect(url1, props);
+ Connection conn2 = spyCqs2.connect(url2, props)) {
+
+ //client-1 creates tables and views
+ createTable(conn1, tableName, NEVER);
+ createView(conn1, tableName, viewName1);
+ createView(conn1, viewName1, viewName2);
+
+ //client-2 upserts into second level view
+ upsert(conn2, viewName2, false);
+
+ //client-1 drop first level view
+ dropView(conn1, viewName1, true);
+
+ //client-2 upserts into second level view and commits
+ upsert(conn2, viewName2, true);
+ }
+ catch (Exception e) {
+ Assert.assertTrue("TableNotFoundException was not thrown when parent view " +
+ "was dropped (cascade) concurrently with upserts.",
+ e instanceof TableNotFoundException);
+ }
+ }
//Helper methods
@@ -943,10 +1334,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(" + col + ")");
}
- private void upsert(Connection conn, String tableName) throws SQLException {
+ private void upsert(Connection conn, String tableName, boolean doCommit) throws SQLException {
conn.createStatement().execute("UPSERT INTO " + tableName +
" (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
- conn.commit();
+ if (doCommit) {
+ conn.commit();
+ }
}
private void query(Connection conn, String tableName) throws SQLException {
@@ -964,17 +1357,42 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+ columnName + " INTEGER");
}
+ private void alterTableDropColumn(Connection conn, String tableName, String columnName) throws SQLException {
+ conn.createStatement().execute("ALTER TABLE " + tableName + " DROP COLUMN " + columnName);
+ }
+
private void alterViewAddColumn(Connection conn, String viewName, String columnName) throws SQLException {
conn.createStatement().execute("ALTER VIEW " + viewName + " ADD IF NOT EXISTS "
+ columnName + " INTEGER");
}
- private void alterIndexChangeStateToRebuild(Connection conn, String tableName, String indexName) throws SQLException, InterruptedException {
- conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + " REBUILD");
- TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
+ private void alterIndexChangeState(Connection conn, String tableName, String indexName, String state) throws SQLException, InterruptedException {
+ conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + state);
}
private void dropIndex(Connection conn, String tableName, String indexName) throws SQLException {
conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
}
+
+ private void dropView(Connection conn, String viewName, boolean cascade) throws SQLException {
+ String sql = "DROP VIEW " + viewName;
+ if (cascade) {
+ sql += " CASCADE";
+ }
+ conn.createStatement().execute(sql);
+ }
+
+ private void multiTableUpsert(Connection conn, String tableName1, String tableName2) throws SQLException {
+ conn.createStatement().execute("UPSERT INTO " + tableName1 +
+ " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+ conn.createStatement().execute("UPSERT INTO " + tableName1 +
+ " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+ conn.createStatement().execute("UPSERT INTO " + tableName2 +
+ " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+ conn.createStatement().execute("UPSERT INTO " + tableName1 +
+ " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+ conn.createStatement().execute("UPSERT INTO " + tableName2 +
+ " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+ conn.commit();
+ }
}