You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sh...@apache.org on 2023/11/29 17:54:35 UTC

(phoenix) branch PHOENIX-6883-feature updated: PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)

This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push:
     new 4499c2965a PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)
4499c2965a is described below

commit 4499c2965af9e07491bfab104b94d8be697d0e35
Author: palash <pa...@gmail.com>
AuthorDate: Wed Nov 29 23:24:30 2023 +0530

    PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726)
---
 .../org/apache/phoenix/execute/MutationState.java  |  66 +++-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  | 135 +------
 .../phoenix/monitoring/GlobalClientMetrics.java    |   4 +-
 .../org/apache/phoenix/monitoring/MetricType.java  |   3 +
 .../phoenix/util/ValidateLastDDLTimestampUtil.java | 211 ++++++++++
 .../phoenix/cache/ServerMetadataCacheTest.java     | 440 ++++++++++++++++++++-
 6 files changed, 708 insertions(+), 151 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1cf23d7e0f..1612fe36a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -68,6 +68,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -122,6 +123,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.apache.phoenix.util.WALAnnotationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,6 +161,7 @@ public class MutationState implements SQLCloseable {
     private long estimatedSize = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
+    private boolean validateLastDdlTimestamp;
     private Map<TableRef, List<MultiRowMutationState>> txMutations = Collections.emptyMap();
 
     private PhoenixTransactionContext phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
@@ -221,6 +224,8 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+        this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil
+                                            .getValidateLastDdlTimestampEnabled(this.connection);
         if (subTask) {
             // this code path is only used while running child scans, we can't pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
@@ -946,26 +951,33 @@ public class MutationState implements SQLCloseable {
                         .setTableName(table.getTableName().getString()).build().buildException(); }
             }
             long timestamp = result.getMutationTime();
-            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
-                serverTimeStamp = timestamp;
-                if (result.wasUpdated()) {
-                    List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
-                    for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
-                        RowMutationState valueEntry = rowEntry.getValue();
-                        if (valueEntry != null) {
-                            Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                            if (colValues != PRow.DELETE_MARKER) {
-                                for (PColumn column : colValues.keySet()) {
-                                    if (!column.isDynamic()) columns.add(column);
+            serverTimeStamp = timestamp;
+
+            /* when last_ddl_timestamp validation is enabled,
+             we don't know if this table's cache result was force updated
+             during the validation, so always validate columns */
+            if ((timestamp != QueryConstants.UNSET_TIMESTAMP && result.wasUpdated())
+                    || this.validateLastDdlTimestamp) {
+                List<PColumn> columns
+                        = Lists.newArrayListWithExpectedSize(table.getColumns().size());
+                for (Map.Entry<ImmutableBytesPtr, RowMutationState>
+                        rowEntry : rowKeyToColumnMap.entrySet()) {
+                    RowMutationState valueEntry = rowEntry.getValue();
+                    if (valueEntry != null) {
+                        Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                        if (colValues != PRow.DELETE_MARKER) {
+                            for (PColumn column : colValues.keySet()) {
+                                if (!column.isDynamic()) {
+                                    columns.add(column);
                                 }
                             }
                         }
                     }
-                    for (PColumn column : columns) {
-                        if (column != null) {
-                            resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
-                                    column.getName().getString());
-                        }
+                }
+                for (PColumn column : columns) {
+                    if (column != null) {
+                        resolvedTable.getColumnFamily(column.getFamilyName().getString())
+                                .getPColumnForColumnName(column.getName().getString());
                     }
                 }
             }
@@ -1201,6 +1213,28 @@ public class MutationState implements SQLCloseable {
             commitBatches = createCommitBatches(tableRefIterator);
         }
 
+        //if enabled, validate last ddl timestamps for all tables in the mutationsMap
+        //for now, force update client cache for all tables if StaleMetadataCacheException is seen
+        if (this.validateLastDdlTimestamp) {
+            List<TableRef> tableRefs = new ArrayList<>(this.mutationsMap.keySet());
+            try {
+                ValidateLastDDLTimestampUtil.validateLastDDLTimestamp(
+                        connection, tableRefs, true, true);
+            } catch (StaleMetadataCacheException e) {
+                GlobalClientMetrics
+                        .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.increment();
+                MetaDataClient mc = new MetaDataClient(connection);
+                PName tenantId = connection.getTenantId();
+                LOGGER.debug("Force updating client metadata cache for {}",
+                        ValidateLastDDLTimestampUtil.getInfoString(tenantId, tableRefs));
+                for (TableRef tableRef : tableRefs) {
+                    String schemaName = tableRef.getTable().getSchemaName().toString();
+                    String tableName = tableRef.getTable().getTableName().toString();
+                    mc.updateCache(tenantId, schemaName, tableName, true);
+                }
+            }
+        }
+
         for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches) {
             long [] serverTimestamps = validateServerTimestamps ? validateAll(commitBatch) : null;
             sendBatch(commitBatch, serverTimestamps, sendAll);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d68e981d10..41981c41c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -64,20 +64,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.compile.BaseMutationPlan;
@@ -107,8 +101,6 @@ import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
-import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.exception.StaleMetadataCacheException;
@@ -128,6 +120,7 @@ import org.apache.phoenix.log.QueryLogInfo;
 import org.apache.phoenix.log.QueryLogger;
 import org.apache.phoenix.log.QueryLoggerUtil;
 import org.apache.phoenix.log.QueryStatus;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.monitoring.TableMetricsManager;
 import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
@@ -206,12 +199,10 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -233,6 +224,7 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
+import org.apache.phoenix.util.ValidateLastDDLTimestampUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -308,7 +300,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
     public PhoenixStatement(PhoenixConnection connection) {
         this.connection = connection;
         this.queryTimeoutMillis = getDefaultQueryTimeoutMillis();
-        this.validateLastDdlTimestamp = getValidateLastDdlTimestampEnabled();
+        this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil
+                                            .getValidateLastDdlTimestampEnabled(this.connection);
     }
 
     /**
@@ -320,12 +313,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
             QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
     }
 
-    private boolean getValidateLastDdlTimestampEnabled() {
-        return connection.getQueryServices().getProps()
-                .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
-                        QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
-    }
-
     protected List<PhoenixResultSet> getResultSets() {
         return resultSets;
     }
@@ -349,109 +336,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
         return executeQuery(stmt, true, queryLogger, noCommit, this.validateLastDdlTimestamp);
     }
 
-    private String getInfoString(TableRef tableRef) {
-        return String.format("Tenant: %s, Schema: %s, Table: %s",
-                this.connection.getTenantId(),
-                tableRef.getTable().getSchemaName(),
-                tableRef.getTable().getTableName());
-    }
-
-    private void setLastDDLTimestampRequestParameters(
-            RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder, PTable pTable) {
-        byte[] tenantIDBytes = this.connection.getTenantId() == null
-                ? HConstants.EMPTY_BYTE_ARRAY
-                : this.connection.getTenantId().getBytes();
-        byte[] schemaBytes = pTable.getSchemaName() == null
-                                ?   HConstants.EMPTY_BYTE_ARRAY
-                                : pTable.getSchemaName().getBytes();
-        builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
-        builder.setSchemaName(ByteStringer.wrap(schemaBytes));
-        builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes()));
-        builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp());
-    }
-    /**
-     * Build a request for the validateLastDDLTimestamp RPC.
-     * @param tableRef
-     * @return ValidateLastDDLTimestampRequest for the table in tableRef
-     */
-    private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
-                getValidateDDLTimestampRequest(TableRef tableRef) throws TableNotFoundException {
-        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
-                = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
-        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder
-                = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
-
-        //when querying an index, we need to validate its parent table in case the index was dropped
-        if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
-            PTableKey key = new PTableKey(this.connection.getTenantId(),
-                                                tableRef.getTable().getParentName().getString());
-            PTable parentTable = this.connection.getTable(key);
-            setLastDDLTimestampRequestParameters(innerBuilder, parentTable);
-            requestBuilder.addLastDDLTimestampRequests(innerBuilder);
-        }
-
-        innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
-        setLastDDLTimestampRequestParameters(innerBuilder, tableRef.getTable());
-        requestBuilder.addLastDDLTimestampRequests(innerBuilder);
-
-        //when querying a view, we need to validate last ddl timestamps for all its ancestors
-        if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
-            PTable pTable = tableRef.getTable();
-            while (pTable.getParentName() != null) {
-                PTableKey key = new PTableKey(this.connection.getTenantId(),
-                                                pTable.getParentName().getString());
-                PTable parentTable = this.connection.getTable(key);
-                innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
-                setLastDDLTimestampRequestParameters(innerBuilder, parentTable);
-                requestBuilder.addLastDDLTimestampRequests(innerBuilder);
-                pTable = parentTable;
-            }
-        }
-        return requestBuilder.build();
-    }
-
-    /**
-     * Verifies that table metadata in client cache is up-to-date with server.
-     * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp.
-     * Retry once if there was an error performing the RPC, otherwise throw the Exception.
-     * @param tableRef
-     * @throws SQLException
-     */
-    private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) throws SQLException {
-
-        String infoString = getInfoString(tableRef);
-        try (Admin admin = this.connection.getQueryServices().getAdmin()) {
-            // get all live region servers
-            List<ServerName> regionServers
-                    = this.connection.getQueryServices().getLiveRegionServers();
-            // pick one at random
-            ServerName regionServer
-                    = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
-
-            LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}",
-                    infoString, regionServer);
-
-            // RPC
-            CoprocessorRpcChannel channel = admin.coprocessorService(regionServer);
-            PhoenixRegionServerEndpoint.BlockingInterface service
-                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
-            service.validateLastDDLTimestamp(null, getValidateDDLTimestampRequest(tableRef));
-        } catch (Exception e) {
-            SQLException parsedException = ServerUtil.parseServerException(e);
-            if (parsedException instanceof StaleMetadataCacheException) {
-                throw parsedException;
-            }
-            //retry once for any exceptions other than StaleMetadataCacheException
-            LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException);
-            if (doRetry) {
-                // update the list of live region servers
-                this.connection.getQueryServices().refreshLiveRegionServers();
-                validateLastDDLTimestamp(tableRef, false);
-                return;
-            }
-            throw parsedException;
-        }
-    }
 
     private PhoenixResultSet executeQuery(final CompilableStatement stmt,
                                           final boolean doRetryOnMetaNotFoundError,
@@ -508,7 +392,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
                                 //verify metadata for the table/view/index in the query plan
                                 //plan.getTableRef can be null in some cases like EXPLAIN <query>
                                 if (shouldValidateLastDdlTimestamp && plan.getTableRef() != null) {
-                                    validateLastDDLTimestamp(plan.getTableRef(), true);
+                                    ValidateLastDDLTimestampUtil.validateLastDDLTimestamp(
+                                        connection, Arrays.asList(plan.getTableRef()), false, true);
                                 }
 
                                 // this will create its own trace internally, so we don't wrap this
@@ -564,13 +449,17 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable
                                 }
                                 throw e;
                             } catch (StaleMetadataCacheException e) {
+                                GlobalClientMetrics
+                                        .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER
+                                        .increment();
                                 updateMetrics = false;
                                 PTable pTable = lastQueryPlan.getTableRef().getTable();
-                                LOGGER.debug("Force updating client metadata cache for {}",
-                                        getInfoString(getLastQueryPlan().getTableRef()));
                                 String schemaN = pTable.getSchemaName().toString();
                                 String tableN = pTable.getTableName().toString();
                                 PName tenantId = connection.getTenantId();
+                                LOGGER.debug("Force updating client metadata cache for {}",
+                                        ValidateLastDDLTimestampUtil.getInfoString(tenantId,
+                                                Arrays.asList(getLastQueryPlan().getTableRef())));
 
                                 // if the index metadata was stale, we will update the client cache
                                 // for the parent table, which will also add the new index metadata
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 70529dc36c..8763a7c5ec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -39,6 +39,7 @@ import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.STALE_METADATA_CACHE_EXCEPTION_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
@@ -159,7 +160,8 @@ public enum GlobalClientMetrics {
     GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER),
 
     GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
-    GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER);
+    GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER),
+    GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER(STALE_METADATA_CACHE_EXCEPTION_COUNTER);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class);
     private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index acdba9e551..b7b0c4b562 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -150,6 +150,9 @@ public enum MetricType {
     CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
     CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE),
     PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE),
+    STALE_METADATA_CACHE_EXCEPTION_COUNTER("smce",
+            "Number of StaleMetadataCacheException encountered.",
+            LogLevel.DEBUG, PLong.INSTANCE),
 
     // hbase metrics
     COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE),
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
new file mode 100644
index 0000000000..47159fccca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.TableRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for last ddl timestamp validation from the client.
+ */
+public class ValidateLastDDLTimestampUtil {
+
+    private ValidateLastDDLTimestampUtil() {}
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(ValidateLastDDLTimestampUtil.class);
+
+    public static String getInfoString(PName tenantId, List<TableRef> tableRefs) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(String.format("Tenant: %s, ", tenantId));
+        for (TableRef tableRef : tableRefs) {
+            sb.append(String.format("{Schema: %s, Table: %s},",
+                    tableRef.getTable().getSchemaName(),
+                    tableRef.getTable().getTableName()));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Get whether last ddl timestamp validation is enabled on the connection
+     * @param connection
+     * @return true if it is enabled, false otherwise
+     */
+    public static boolean getValidateLastDdlTimestampEnabled(PhoenixConnection connection) {
+        return connection.getQueryServices().getProps()
+                .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
+                        QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
+    }
+
+    /**
+     * Verifies that table metadata for given tables is up-to-date in client cache with server.
+     * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw the Exception.
+     * @param tableRefs
+     * @param isWritePath
+     * @param doRetry
+     * @throws SQLException
+     */
+    public static void validateLastDDLTimestamp(
+            PhoenixConnection conn, List<TableRef> tableRefs, boolean isWritePath, boolean doRetry)
+            throws SQLException {
+
+        String infoString = getInfoString(conn.getTenantId(), tableRefs);
+        try (Admin admin = conn.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = conn.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}",
+                    infoString, regionServer);
+
+            // RPC
+            CoprocessorRpcChannel channel = admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request
+                    = getValidateDDLTimestampRequest(conn, tableRefs, isWritePath);
+            service.validateLastDDLTimestamp(null, request);
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                conn.getQueryServices().refreshLiveRegionServers();
+                validateLastDDLTimestamp(conn, tableRefs, isWritePath, false);
+                return;
+            }
+            throw parsedException;
+        }
+    }
+
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC for the given tables.
+     * 1. For a view, we need to add all its ancestors to the request
+     *    in case something changed in the hierarchy.
+     * 2. For an index, we need to add its parent table to the request
+     *    in case the index was dropped.
+     * 3. On the write path, we need to add all indexes of a table/view
+     *    in case index state was changed.
+     * @param conn
+     * @param tableRefs
+     * @param isWritePath
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+        getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> tableRefs,
+                                        boolean isWritePath) throws TableNotFoundException {
+
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder
+                = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder;
+
+        for (TableRef tableRef : tableRefs) {
+
+            //when querying an index, we need to validate its parent table
+            //in case the index was dropped
+            if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
+                innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                PTableKey key = new PTableKey(conn.getTenantId(),
+                        tableRef.getTable().getParentName().getString());
+                PTable parentTable = conn.getTable(key);
+                setLastDDLTimestampRequestParameters(innerBuilder, conn.getTenantId(), parentTable);
+                requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+            }
+
+            // add the tableRef to the request
+            innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+            setLastDDLTimestampRequestParameters(
+                    innerBuilder, conn.getTenantId(), tableRef.getTable());
+            requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+
+            //when querying a view, we need to validate last ddl timestamps for all its ancestors
+            if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
+                PTable pTable = tableRef.getTable();
+                while (pTable.getParentName() != null) {
+                    PTableKey key = new PTableKey(conn.getTenantId(),
+                            pTable.getParentName().getString());
+                    PTable parentTable = conn.getTable(key);
+                    innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                    setLastDDLTimestampRequestParameters(
+                            innerBuilder, conn.getTenantId(), parentTable);
+                    requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+                    pTable = parentTable;
+                }
+            }
+
+            //on the write path, we need to validate all indexes of a table/view
+            //in case index state was changed
+            if (isWritePath) {
+                for (PTable idxPTable : tableRef.getTable().getIndexes()) {
+                    innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                    setLastDDLTimestampRequestParameters(
+                            innerBuilder, conn.getTenantId(), idxPTable);
+                    requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+                }
+            }
+        }
+        return requestBuilder.build();
+    }
+
+    /**
+     * For the given PTable, set the attributes on the LastDDLTimestampRequest.
+     */
+    private static void setLastDDLTimestampRequestParameters(
+            RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder,
+            PName tenantId, PTable pTable) {
+        byte[] tenantIDBytes = tenantId == null
+                ? HConstants.EMPTY_BYTE_ARRAY
+                : tenantId.getBytes();
+        byte[] schemaBytes = pTable.getSchemaName() == null
+                ?   HConstants.EMPTY_BYTE_ARRAY
+                : pTable.getSchemaName().getBytes();
+        builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
+        builder.setSchemaName(ByteStringer.wrap(schemaBytes));
+        builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes()));
+        builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 2a618cdcce..6ae19ad144 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.GlobalClientMetrics;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ConnectionProperty;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -36,6 +39,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -77,6 +81,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
+    @Before
+    public void resetMetrics() {
+        GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().reset();
+    }
+
     @After
     public void resetMetadataCache() {
         ServerMetadataCache.resetCache();
@@ -545,7 +554,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // create table with UCF=never and upsert data using client-1
             createTable(conn1, tableName, NEVER);
-            upsert(conn1, tableName);
+            upsert(conn1, tableName, true);
 
             // select query from client-2 works to populate client side metadata cache
             // there should be 1 update to the client cache
@@ -562,16 +571,22 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // select query from client-2 with old ddl timestamp works
             // there should be one update to the client cache
+            //verify client got a StaleMetadataCacheException
             query(conn2, tableName);
             expectedNumCacheUpdates = 1;
             Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
                     .addTable(any(PTable.class), anyLong());
+            Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+                    1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
 
             // select query from client-2 with latest ddl timestamp works
             // there should be no more updates to client cache
+            //verify client did not get another StaleMetadataCacheException
             query(conn2, tableName);
             Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
                     .addTable(any(PTable.class), anyLong());
+            Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+                    1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
         }
     }
 
@@ -594,7 +609,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // create table and upsert using client-1
             createTable(conn1, tableName, NEVER);
-            upsert(conn1, tableName);
+            upsert(conn1, tableName, true);
 
             // Instrument ServerMetadataCache to throw a SQLException once
             cache = ServerMetadataCache.getInstance(config);
@@ -632,7 +647,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // create table and upsert using client-1
             createTable(conn1, tableName, NEVER);
-            upsert(conn1, tableName);
+            upsert(conn1, tableName, true);
 
             // query using client-2 to populate cache
             query(conn2, tableName);
@@ -682,7 +697,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // create table and upsert using client-1
             createTable(conn1, tableName, NEVER);
-            upsert(conn1, tableName);
+            upsert(conn1, tableName, true);
 
             // Instrument ServerMetadataCache to throw a SQLException twice
             cache = ServerMetadataCache.getInstance(config);
@@ -722,7 +737,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
 
             // create table using client-1
             createTable(conn1, tableName, NEVER);
-            upsert(conn1, tableName);
+            upsert(conn1, tableName, true);
 
             // create 2 level of views using client-1
             String view1 = generateUniqueName();
@@ -813,7 +828,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
                     .addTable(any(PTable.class), anyLong());
 
             //client-1 updates index property
-            alterIndexChangeStateToRebuild(conn1, tableName, indexName);
+            alterIndexChangeState(conn1, tableName, indexName, " REBUILD");
 
             //client-2's query using the index should work
             PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class);
@@ -911,6 +926,382 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         }
     }
 
+    /**
+     * Test the case when a client upserts into multiple tables before calling commit.
+     * Verify that last ddl timestamp was validated for all involved tables only once.
+     */
+    @Test
+    public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates 2 tables
+            createTable(conn1, tableName1, NEVER);
+            createTable(conn1, tableName2, NEVER);
+
+            //client-2 populates its cache, 1 getTable call for each table
+            query(conn2, tableName1);
+            query(conn2, tableName2);
+
+            //client-1 alters one of the tables
+            alterTableAddColumn(conn1, tableName2, "col3");
+
+            //client-2 upserts multiple rows to both tables before calling commit
+            //verify the table metadata was fetched for each table
+            multiTableUpsert(conn2, tableName1, tableName2);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName2)),
+                    anyLong(), anyLong());
+        }
+    }
+
+    /**
+     * Test upserts into a multi-level view hierarchy.
+     */
+    @Test
+    public void testUpsertViewWithOldDDLTimestamp() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates a table and views
+            createTable(conn1, tableName, NEVER);
+            createView(conn1, tableName, viewName1);
+            createView(conn1, viewName1, viewName2);
+
+            //client-2 populates its cache, 1 getTable RPC each for table, view1, view2
+            query(conn2, viewName2);
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+                    anyLong(), anyLong());
+
+            //client-1 alters first level view
+            alterViewAddColumn(conn1, viewName1, "col3");
+
+            //client-2 upserts into second level view
+            //verify there was a getTable RPC for the view and all its ancestors
+            //verify that the client got a StaleMetadataCacheException
+            upsert(conn2, viewName2, true);
+
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+                    anyLong(), anyLong());
+            Assert.assertEquals("Client should have encountered a StaleMetadataCacheException",
+                    1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
+            //client-2 upserts into first level view
+            //verify no getTable RPCs
+            //verify that the client did not get a StaleMetadataCacheException
+            upsert(conn2, viewName1, true);
+
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)),
+                    anyLong(), anyLong());
+            Assert.assertEquals("Client should not have encountered another StaleMetadataCacheException",
+                    1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue());
+        }
+    }
+
+    /**
+     * Test that upserts into a table which was dropped throws a TableNotFoundException.
+     */
+    @Test
+    public void testUpsertDroppedTable() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 drops the table
+            conn2.createStatement().execute("DROP TABLE " + tableName);
+
+            //client-1 commits
+            conn1.commit();
+            Assert.fail("Commit should have failed with TableNotFoundException");
+        }
+        catch (Exception e) {
+            Assert.assertTrue("TableNotFoundException was not thrown when table was dropped concurrently with upserts.", e instanceof TableNotFoundException);
+        }
+    }
+
+    /**
+     * Client-1 creates a table and executes some upserts.
+     * Client-2 drops a column for which client-1 had executed upserts.
+     * Client-1 calls commit. Verify that client-1 gets ColumnNotFoundException
+     */
+    @Test
+    public void testUpsertDroppedTableColumn() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 drops a column
+            alterTableDropColumn(conn2, tableName, "v1");
+
+            //client-1 commits
+            conn1.commit();
+            Assert.fail("Commit should have failed with ColumnNotFoundException");
+        }
+        catch (Exception e) {
+            Assert.assertTrue("ColumnNotFoundException was not thrown when column was dropped concurrently with upserts.", e instanceof ColumnNotFoundException);
+        }
+    }
+
+    /**
+     * Client-1 creates a table and executes some upserts.
+     * Client-2 adds a column to the table.
+     * Client-1 calls commit. Verify that client-1 does not get any errors.
+     */
+    @Test
+    public void testUpsertAddTableColumn() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 adds a column
+            alterTableAddColumn(conn2, tableName, "v5");
+
+            //client-1 commits
+            conn1.commit();
+        }
+    }
+
+    /**
+     * Client-1 creates a table and executes some upserts.
+     * Client-2 creates an index on the table.
+     * Client-1 calls commit. Verify that index mutations were correctly generated
+     */
+    @Test
+    public void testConcurrentUpsertIndexCreation() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 creates an index
+            createIndex(conn2, tableName, indexName, "v1");
+
+            //client-1 commits
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            conn1.commit();
+
+            //verify index rows
+            int tableCount, indexCount;
+            ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+            rs.next();
+            tableCount = rs.getInt(1);
+
+            rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName);
+            rs.next();
+            indexCount = rs.getInt(1);
+
+            Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount);
+        }
+    }
+
+    /**
+     * Client-1 creates a table, index and executes some upserts.
+     * Client-2 drops the index on the table.
+     * Client-1 calls commit. Verify that client-1 does not see any errors
+     */
+    @Test
+    public void testConcurrentUpsertDropIndex() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables, index and executes upserts
+            createTable(conn1, tableName, NEVER);
+            createIndex(conn1, tableName, indexName, "v1");
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 drops the index
+            dropIndex(conn2, tableName, indexName);
+
+            //client-1 commits
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            conn1.commit();
+        }
+    }
+    /**
+     * Client-1 creates a table, index in disabled state and executes some upserts.
+     * Client-2 marks the index as Rebuild.
+     * Client-1 calls commit. Verify that index mutations were correctly generated
+     */
+    @Test
+    public void testConcurrentUpsertIndexStateChange() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // client-1 creates tables and executes upserts
+            createTable(conn1, tableName, NEVER);
+            createIndex(conn1, tableName, indexName, "v1");
+            alterIndexChangeState(conn1, tableName, indexName, " DISABLE");
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+
+            // client-2 creates an index
+            alterIndexChangeState(conn2, tableName, indexName, " REBUILD");
+
+            //client-1 commits
+            upsert(conn1, tableName, false);
+            upsert(conn1, tableName, false);
+            conn1.commit();
+
+            //verify index rows
+            int tableCount, indexCount;
+            ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+            rs.next();
+            tableCount = rs.getInt(1);
+
+            rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName);
+            rs.next();
+            indexCount = rs.getInt(1);
+
+            Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount);
+        }
+    }
+
+    /**
+     * Test that upserts into a view whose parent was dropped throws a TableNotFoundException.
+     */
+    @Test
+    public void testConcurrentUpsertDropView() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates tables and views
+            createTable(conn1, tableName, NEVER);
+            createView(conn1, tableName, viewName1);
+            createView(conn1, viewName1, viewName2);
+
+            //client-2 upserts into second level view
+            upsert(conn2, viewName2, false);
+
+            //client-1 drop first level view
+            dropView(conn1, viewName1, true);
+
+            //client-2 upserts into second level view and commits
+            upsert(conn2, viewName2, true);
+        }
+        catch (Exception e) {
+            Assert.assertTrue("TableNotFoundException was not thrown when parent view " +
+                            "was dropped (cascade) concurrently with upserts.",
+                    e instanceof TableNotFoundException);
+        }
+    }
 
     //Helper methods
 
@@ -943,10 +1334,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
         conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(" + col + ")");
     }
 
-    private void upsert(Connection conn, String tableName) throws SQLException {
+    private void upsert(Connection conn, String tableName, boolean doCommit) throws SQLException {
         conn.createStatement().execute("UPSERT INTO " + tableName +
                 " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
-        conn.commit();
+        if (doCommit) {
+            conn.commit();
+        }
     }
 
     private void query(Connection conn, String tableName) throws SQLException {
@@ -964,17 +1357,42 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
                 + columnName + " INTEGER");
     }
 
+    private void alterTableDropColumn(Connection conn, String tableName, String columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " DROP COLUMN " + columnName);
+    }
+
     private void alterViewAddColumn(Connection conn, String viewName, String columnName) throws SQLException {
         conn.createStatement().execute("ALTER VIEW " + viewName + " ADD IF NOT EXISTS "
                 + columnName + " INTEGER");
     }
 
-    private void alterIndexChangeStateToRebuild(Connection conn, String tableName, String indexName) throws SQLException, InterruptedException {
-        conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + " REBUILD");
-        TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
+    private void alterIndexChangeState(Connection conn, String tableName, String indexName, String state) throws SQLException, InterruptedException {
+        conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + state);
     }
 
     private void dropIndex(Connection conn, String tableName, String indexName) throws SQLException {
         conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName);
     }
+
+    private void dropView(Connection conn, String viewName, boolean cascade) throws SQLException {
+        String sql = "DROP VIEW " + viewName;
+        if (cascade) {
+            sql += " CASCADE";
+        }
+        conn.createStatement().execute(sql);
+    }
+
+    private void multiTableUpsert(Connection conn, String tableName1, String tableName2) throws SQLException {
+        conn.createStatement().execute("UPSERT INTO " + tableName1 +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.createStatement().execute("UPSERT INTO " + tableName1 +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.createStatement().execute("UPSERT INTO " + tableName2 +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.createStatement().execute("UPSERT INTO " + tableName1 +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.createStatement().execute("UPSERT INTO " + tableName2 +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
 }