You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/04/17 23:08:17 UTC

[phoenix] branch master updated: PHOENIX-5496 Ensure that we handle all server-side mutation codes on the client(for create table)

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new c627f94  PHOENIX-5496 Ensure that we handle all server-side mutation codes on the client(for create table)
c627f94 is described below

commit c627f94b50802bde493bfef1a2a090d9568eff6f
Author: Neha <ne...@salesforce.com>
AuthorDate: Wed Apr 15 18:42:59 2020 -0700

    PHOENIX-5496 Ensure that we handle all server-side mutation codes on the client(for create table)
    
    Signed-off-by: s.kadam <s....@apache.org>
---
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  | 278 ++++++++++++---------
 .../apache/phoenix/schema/MetaDataClientTest.java  |  86 +++++++
 3 files changed, 247 insertions(+), 121 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 3bdf905..2f271c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -424,7 +424,8 @@ public enum SQLExceptionCode {
     UNABLE_TO_UPDATE_PARENT_TABLE(1143, "XCL43", "Error Updating the parent table"),
     UNABLE_TO_DELETE_CHILD_LINK(1144, "XCL44", "Error deleting parent-child link (Link type=" +
             PTable.LinkType.CHILD_TABLE + ") for view"),
-
+    TABLE_NOT_IN_REGION(1145, "XCL45", "No modifications allowed on this table. "
+    + "Table not in this region."),
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
      */
@@ -596,4 +597,5 @@ public enum SQLExceptionCode {
         }
         return code;
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index f6eb1ae..865408c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -137,9 +137,10 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.HashSet;
-
 import org.apache.hadoop.conf.Configuration;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.JsonObject;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -2997,149 +2998,186 @@ public class MetaDataClient {
                 splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean(
                         QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER));
             }
-            MetaDataMutationResult result = connection.getQueryServices().createTable(
-                    tableMetaData,
-                    viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes() : null,
-                    tableType, tableProps, familyPropList, splits, isNamespaceMapped, allocateIndexId,
-                    UpgradeUtil.isNoUpgradeSet(connection.getClientInfo()), parent);
+
+			// Modularized this code for unit testing
+            MetaDataMutationResult result = connection.getQueryServices().createTable(tableMetaData
+                ,viewType == ViewType.MAPPED || allocateIndexId ? physicalNames.get(0).getBytes()
+                : null, tableType, tableProps, familyPropList, splits, isNamespaceMapped,
+                allocateIndexId, UpgradeUtil.isNoUpgradeSet(connection.getClientInfo()), parent);
             MutationCode code = result.getMutationCode();
-            switch(code) {
+            if (code != MutationCode.TABLE_NOT_FOUND) {
+                boolean tableAlreadyExists = handleCreateTableMutationCode(result, code, statement,
+                        schemaName, tableName, parent);
+                if(tableAlreadyExists) {
+                    return null;
+                }
+            }
+            // If the parent table of the view has the auto partition sequence name attribute,
+            // set the view statement and relevant partition column attributes correctly
+            if (parent!=null && parent.getAutoPartitionSeqName()!=null) {
+                final PColumn autoPartitionCol = parent.getPKColumns().get(MetaDataUtil
+                        .getAutoPartitionColIndex(parent));
+                final Long autoPartitionNum = Long.valueOf(result.getAutoPartitionNum());
+                columns.put(autoPartitionCol, new DelegateColumn(autoPartitionCol) {
+                    @Override
+                    public byte[] getViewConstant() {
+                        PDataType dataType = autoPartitionCol.getDataType();
+                        Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
+                        byte[] bytes = new byte [dataType.getByteSize() + 1];
+                        dataType.toBytes(val, bytes, 0);
+                        return bytes;
+                    }
+                    @Override
+                    public boolean isViewReferenced() {
+                        return true;
+                    }
+                });
+                String viewPartitionClause = QueryUtil.getViewPartitionClause(MetaDataUtil
+                        .getAutoPartitionColumnName(parent), autoPartitionNum);
+                if (viewStatement!=null) {
+                    viewStatement = viewStatement + " AND " + viewPartitionClause;
+                }
+                else {
+                    viewStatement = QueryUtil.getViewStatement(parent.getSchemaName().getString(),
+                            parent.getTableName().getString(), viewPartitionClause);
+                }
+            }
+            PName newSchemaName = PNameFactory.newName(schemaName);
+            /*
+             * It doesn't hurt for the PTable of views to have the cqCounter. However, views always
+             * rely on the parent table's counter to dole out encoded column qualifiers. So setting
+             * the counter as NULL_COUNTER for extra safety.
+             */
+            EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER :
+                    cqCounter;
+            PTable table = new PTableImpl.Builder()
+                    .setType(tableType)
+                    .setState(indexState)
+                    .setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
+                    .setIndexDisableTimestamp(0L)
+                    .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
+                    .setImmutableRows(isImmutableRows)
+                    .setViewStatement(viewStatement)
+                    .setDisableWAL(Boolean.TRUE.equals(disableWAL))
+                    .setMultiTenant(multiTenant)
+                    .setStoreNulls(storeNulls)
+                    .setViewType(viewType)
+                    .setViewIndexIdType(viewIndexIdType)
+                    .setViewIndexId(result.getViewIndexId())
+                    .setIndexType(indexType)
+                    .setTransactionProvider(transactionProvider)
+                    .setUpdateCacheFrequency(updateCacheFrequency)
+                    .setNamespaceMapped(isNamespaceMapped)
+                    .setAutoPartitionSeqName(autoPartitionSeq)
+                    .setAppendOnlySchema(isAppendOnlySchema)
+                    .setImmutableStorageScheme(immutableStorageScheme == null ?
+                            ImmutableStorageScheme.ONE_CELL_PER_COLUMN : immutableStorageScheme)
+                    .setQualifierEncodingScheme(encodingScheme == null ?
+                            QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
+                    .setBaseColumnCount(baseTableColumnCount)
+                    .setEncodedCQCounter(cqCounterToBe)
+                    .setUseStatsForParallelization(useStatsForParallelizationProp)
+                    .setExcludedColumns(ImmutableList.of())
+                    .setTenantId(tenantId)
+                    .setSchemaName(newSchemaName)
+                    .setTableName(PNameFactory.newName(tableName))
+                    .setPkName(pkName == null ? null : PNameFactory.newName(pkName))
+                    .setDefaultFamilyName(defaultFamilyName == null ?
+                            null : PNameFactory.newName(defaultFamilyName))
+                    .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
+                    .setBucketNum(saltBucketNum)
+                    .setIndexes(Collections.emptyList())
+                    .setParentSchemaName((parent == null) ? null : parent.getSchemaName())
+                    .setParentTableName((parent == null) ? null : parent.getTableName())
+                    .setPhysicalNames(physicalNames == null ?
+                            ImmutableList.of() : ImmutableList.copyOf(physicalNames))
+                    .setColumns(columns.values())
+                    .setViewTTL(viewTTL == null ? VIEW_TTL_NOT_DEFINED : viewTTL)
+                    .setViewTTLHighWaterMark(viewTTLHighWaterMark == null ? MIN_VIEW_TTL_HWM :
+                            viewTTLHighWaterMark)
+                    .setViewModifiedUpdateCacheFrequency(tableType ==  PTableType.VIEW &&
+                            parent != null &&
+                            parent.getUpdateCacheFrequency() != updateCacheFrequency)
+                    .setViewModifiedUseStatsForParallelization(tableType ==  PTableType.VIEW &&
+                            parent != null &&
+                            parent.useStatsForParallelization()
+                                    != useStatsForParallelizationProp)
+                    .setViewModifiedViewTTL(tableType ==  PTableType.VIEW &&
+                            parent != null && viewTTL != null &&
+                            parent.getViewTTL() != viewTTL)
+                    .build();
+            result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
+            addTableToCache(result);
+            return table;
+        } finally {
+            connection.setAutoCommit(wasAutoCommit);
+            deleteMutexCells(parentPhysicalSchemaName, parentPhysicalTableName,
+                    acquiredColumnMutexSet);
+        }
+    }
+
+    /* This method handles mutation codes sent by phoenix server, except for TABLE_NOT_FOUND which
+    * is considered to be a success code. If TABLE_ALREADY_EXISTS in hbase, we don't need to add
+    * it in ConnectionQueryServices and we return result as true. However if code is
+    * NEWER_TABLE_FOUND and it does not exists in statement then we return false because we need to
+    * add it ConnectionQueryServices. For other mutation codes it throws related SQLException.
+    * If server is throwing new mutation code which is not being handled by client then it throws
+    * SQLException stating the server side Mutation code.
+    */
+    @VisibleForTesting
+    public boolean handleCreateTableMutationCode(MetaDataMutationResult result, MutationCode code,
+                 CreateTableStatement statement, String schemaName, String tableName,
+                 PTable parent) throws SQLException {
+        switch(code) {
             case TABLE_ALREADY_EXISTS:
-                if (result.getTable() != null) { // Can happen for transactional table that already exists as HBase table
+                if(result.getTable() != null) {
                     addTableToCache(result);
                 }
-                if (!statement.ifNotExists()) {
+                if(!statement.ifNotExists()) {
                     throw new TableAlreadyExistsException(schemaName, tableName, result.getTable());
                 }
-                return null;
+                return true;
             case NEWER_TABLE_FOUND:
                 // Add table to ConnectionQueryServices so it's cached, but don't add
                 // it to this connection as we can't see it.
                 if (!statement.ifNotExists()) {
-                    throw new NewerTableAlreadyExistsException(schemaName, tableName, result.getTable());
+                    throw new NewerTableAlreadyExistsException(schemaName, tableName,
+                            result.getTable());
                 }
+                return false;
             case UNALLOWED_TABLE_MUTATION:
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MUTATE_TABLE)
-                    .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                throwsSQLExceptionUtil("CANNOT_MUTATE_TABLE",schemaName,tableName);
             case CONCURRENT_TABLE_MUTATION:
                 addTableToCache(result);
                 throw new ConcurrentTableMutationException(schemaName, tableName);
             case AUTO_PARTITION_SEQUENCE_NOT_FOUND:
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_PARTITION_SEQUENCE_UNDEFINED)
-                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+                       .setSchemaName(schemaName).setTableName(tableName).build().buildException();
             case CANNOT_COERCE_AUTO_PARTITION_ID:
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_COERCE_AUTO_PARTITION_ID)
-                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+            case UNABLE_TO_CREATE_CHILD_LINK:
+            case PARENT_TABLE_NOT_FOUND:
+            case TABLE_NOT_IN_REGION:
+                throwsSQLExceptionUtil(String.valueOf(code), schemaName, tableName);
             case TOO_MANY_INDEXES:
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TOO_MANY_INDEXES)
-                        .setSchemaName(SchemaUtil.getSchemaNameFromFullName(parent.getPhysicalName().getString()))
-                        .setTableName(SchemaUtil.getTableNameFromFullName(parent.getPhysicalName().getString())).build()
-                        .buildException();
             case UNABLE_TO_UPDATE_PARENT_TABLE:
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNABLE_TO_UPDATE_PARENT_TABLE)
-                        .setSchemaName(SchemaUtil.getSchemaNameFromFullName(parent.getPhysicalName().getString()))
-                        .setTableName(SchemaUtil.getTableNameFromFullName(parent.getPhysicalName().getString())).build()
-                        .buildException();
+                throwsSQLExceptionUtil(String.valueOf(code), SchemaUtil.getSchemaNameFromFullName(
+                parent.getPhysicalName().getString()),SchemaUtil.getTableNameFromFullName(
+                parent.getPhysicalName().getString()));
             default:
-                // If the parent table of the view has the auto partition sequence name attribute,
-                // set the view statement and relevant partition column attributes correctly
-                if (parent!=null && parent.getAutoPartitionSeqName()!=null) {
-                    final PColumn autoPartitionCol = parent.getPKColumns().get(MetaDataUtil.getAutoPartitionColIndex(parent));
-                    final Long autoPartitionNum = Long.valueOf(result.getAutoPartitionNum());
-                    columns.put(autoPartitionCol, new DelegateColumn(autoPartitionCol) {
-                        @Override
-                        public byte[] getViewConstant() {
-                            PDataType dataType = autoPartitionCol.getDataType();
-                            Object val = dataType.toObject(autoPartitionNum, PLong.INSTANCE);
-                            byte[] bytes = new byte [dataType.getByteSize() + 1];
-                            dataType.toBytes(val, bytes, 0);
-                            return bytes;
-                        }
-                        @Override
-                        public boolean isViewReferenced() {
-                            return true;
-                        }
-                    });
-                    String viewPartitionClause = QueryUtil.getViewPartitionClause(MetaDataUtil.getAutoPartitionColumnName(parent), autoPartitionNum);
-                    if (viewStatement!=null) {
-                        viewStatement = viewStatement + " AND " + viewPartitionClause;
-                    }
-                    else {
-                        viewStatement = QueryUtil.getViewStatement(parent.getSchemaName().getString(), parent.getTableName().getString(), viewPartitionClause);
-                    }
-                }
-                PName newSchemaName = PNameFactory.newName(schemaName);
-                /*
-                 * It doesn't hurt for the PTable of views to have the cqCounter. However, views always rely on the
-                 * parent table's counter to dole out encoded column qualifiers. So setting the counter as NULL_COUNTER
-                 * for extra safety.
-                 */
-                EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER : cqCounter;
-                PTable table = new PTableImpl.Builder()
-                        .setType(tableType)
-                        .setState(indexState)
-                        .setTimeStamp(timestamp != null ? timestamp : result.getMutationTime())
-                        .setIndexDisableTimestamp(0L)
-                        .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
-                        .setImmutableRows(isImmutableRows)
-                        .setViewStatement(viewStatement)
-                        .setDisableWAL(Boolean.TRUE.equals(disableWAL))
-                        .setMultiTenant(multiTenant)
-                        .setStoreNulls(storeNulls)
-                        .setViewType(viewType)
-                        .setViewIndexIdType(viewIndexIdType)
-                        .setViewIndexId(result.getViewIndexId())
-                        .setIndexType(indexType)
-                        .setTransactionProvider(transactionProvider)
-                        .setUpdateCacheFrequency(updateCacheFrequency)
-                        .setNamespaceMapped(isNamespaceMapped)
-                        .setAutoPartitionSeqName(autoPartitionSeq)
-                        .setAppendOnlySchema(isAppendOnlySchema)
-                        .setImmutableStorageScheme(immutableStorageScheme == null ?
-                                ImmutableStorageScheme.ONE_CELL_PER_COLUMN : immutableStorageScheme)
-                        .setQualifierEncodingScheme(encodingScheme == null ?
-                                QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : encodingScheme)
-                        .setBaseColumnCount(baseTableColumnCount)
-                        .setEncodedCQCounter(cqCounterToBe)
-                        .setUseStatsForParallelization(useStatsForParallelizationProp)
-                        .setExcludedColumns(ImmutableList.of())
-                        .setTenantId(tenantId)
-                        .setSchemaName(newSchemaName)
-                        .setTableName(PNameFactory.newName(tableName))
-                        .setPkName(pkName == null ? null : PNameFactory.newName(pkName))
-                        .setDefaultFamilyName(defaultFamilyName == null ?
-                                null : PNameFactory.newName(defaultFamilyName))
-                        .setRowKeyOrderOptimizable(rowKeyOrderOptimizable)
-                        .setBucketNum(saltBucketNum)
-                        .setIndexes(Collections.emptyList())
-                        .setParentSchemaName((parent == null) ? null : parent.getSchemaName())
-                        .setParentTableName((parent == null) ? null : parent.getTableName())
-                        .setPhysicalNames(physicalNames == null ?
-                                ImmutableList.of() : ImmutableList.copyOf(physicalNames))
-                        .setColumns(columns.values())
-                        .setViewTTL(viewTTL == null ? VIEW_TTL_NOT_DEFINED : viewTTL)
-                        .setViewTTLHighWaterMark(viewTTLHighWaterMark == null ? MIN_VIEW_TTL_HWM : viewTTLHighWaterMark)
-                        .setViewModifiedUpdateCacheFrequency(tableType ==  PTableType.VIEW &&
-                                parent != null &&
-                                parent.getUpdateCacheFrequency() != updateCacheFrequency)
-                        .setViewModifiedUseStatsForParallelization(tableType ==  PTableType.VIEW &&
-                                parent != null &&
-                                parent.useStatsForParallelization()
-                                        != useStatsForParallelizationProp)
-                        .setViewModifiedViewTTL(tableType ==  PTableType.VIEW &&
-                                parent != null && viewTTL != null &&
-                                parent.getViewTTL() != viewTTL)
-                        .build();
-                result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
-                addTableToCache(result);
-                return table;
-            }
-        } finally {
-            connection.setAutoCommit(wasAutoCommit);
-            deleteMutexCells(parentPhysicalSchemaName, parentPhysicalTableName, acquiredColumnMutexSet);
+                // Cannot use SQLExecptionInfo here since not all mutation codes have their
+                // corresponding codes in the enum SQLExceptionCode
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEXPECTED_MUTATION_CODE)
+                .setSchemaName(schemaName).setTableName(tableName).setMessage("mutation code: "
+                + code).build().buildException();
         }
     }
 
+    private void throwsSQLExceptionUtil(String code,String schemaName, String tableName)
+            throws SQLException {
+        throw new SQLExceptionInfo.Builder(SQLExceptionCode.valueOf(code))
+                .setSchemaName(schemaName).setTableName(tableName).build().buildException();
+    }
+
     private static boolean isPkColumn(PrimaryKeyConstraint pkConstraint, ColumnDef colDef) {
         return colDef.isPK() || (pkConstraint != null && pkConstraint.contains(colDef.getColumnDefName()));
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/MetaDataClientTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/MetaDataClientTest.java
new file mode 100644
index 0000000..1f408b2
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/MetaDataClientTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.PSchema;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.fail;
+
+public class MetaDataClientTest extends BaseConnectionlessQueryTest {
+
+    private static String schema;
+    private static String baseTable;
+    private static PhoenixConnection phxConn;
+    private static MetaDataClient mockClient;
+    private static String ddlFormat;
+    private static CreateTableStatement stmt;
+
+    @BeforeClass
+    public static void setupTest() throws SQLException {
+        schema = generateUniqueName();
+        baseTable = generateUniqueName();
+        phxConn = (PhoenixConnection) DriverManager.getConnection(getUrl());
+        mockClient = new MetaDataClient(phxConn);
+        ddlFormat = "CREATE TABLE " + schema + "." + baseTable + " " +
+                "(A VARCHAR PRIMARY KEY, B BIGINT, C VARCHAR)";
+        stmt = (CreateTableStatement)new SQLParser((ddlFormat)).parseStatement();
+    }
+
+    @Test
+    public void testHandleCreateTableMutationCode() throws SQLException {
+        MetaDataProtocol.MetaDataMutationResult result = new MetaDataProtocol.MetaDataMutationResult
+                (MetaDataProtocol.MutationCode.UNALLOWED_TABLE_MUTATION ,new PSchema(schema),
+            EnvironmentEdgeManager.currentTimeMillis());
+        try {
+            mockClient.handleCreateTableMutationCode(result, result.getMutationCode(), stmt,
+                    schema, baseTable, null);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_MUTATE_TABLE.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    //Testing the case when Mutation code thrown from sever is not handled by MetaDataClient
+    public void testHandleCreateTableMutationCodeWithNewCode() throws SQLException {
+        MetaDataProtocol.MetaDataMutationResult result = new MetaDataProtocol
+                .MetaDataMutationResult(MetaDataProtocol.MutationCode.NO_PK_COLUMNS,
+                new PSchema(schema), EnvironmentEdgeManager.currentTimeMillis());
+        try {
+            mockClient.handleCreateTableMutationCode(result, result.getMutationCode(), stmt,
+                    schema, baseTable, null);
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.UNEXPECTED_MUTATION_CODE.getErrorCode(),
+                    e.getErrorCode());
+            assertTrue(e.getMessage().contains("NO_PK_COLUMNS"));
+        }
+    }
+
+}