You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/09/24 15:28:40 UTC
[35/50] [abbrv] phoenix git commit: PHOENIX-4798 Update encoded col
qualifiers on the base table correctly
PHOENIX-4798 Update encoded col qualifiers on the base table correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b0cc455c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b0cc455c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b0cc455c
Branch: refs/heads/omid2
Commit: b0cc455c93df4bd13a7db5a8a21496f2eb170500
Parents: 46f50d2
Author: Thomas D'Silva <td...@apache.org>
Authored: Wed Aug 15 12:23:56 2018 -0700
Committer: Thomas D'Silva <td...@apache.org>
Committed: Mon Aug 27 15:30:26 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/phoenix/end2end/ViewIT.java | 184 +++++++++++++------
.../coprocessor/MetaDataEndpointImpl.java | 74 +++++---
.../PhoenixMetaDataCoprocessorHost.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 33 +++-
4 files changed, 208 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0cc455c/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index fda9490..c1a7ff5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -59,19 +60,16 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseMetaDataEndpointObserver;
+import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost;
+import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
import org.apache.phoenix.exception.PhoenixIOException;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
@@ -98,36 +96,39 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Maps;
-
@RunWith(Parameterized.class)
public class ViewIT extends SplitSystemCatalogIT {
protected String tableDDLOptions;
protected boolean transactional;
+ protected boolean columnEncoded;
- private static final String FAILED_VIEWNAME = "FAILED_VIEW";
- private static final byte[] FAILED_ROWKEY_BYTES =
- SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), Bytes.toBytes(FAILED_VIEWNAME));
- private static final String SLOW_VIEWNAME_PREFIX = "SLOW_VIEW";
- private static final byte[] SLOW_ROWKEY_PREFIX_BYTES =
- SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2),
- Bytes.toBytes(SLOW_VIEWNAME_PREFIX));
+ private static final String FAILED_VIEWNAME = SchemaUtil.getTableName(SCHEMA2, "FAILED_VIEW");
+ private static final String SLOW_VIEWNAME_PREFIX = SchemaUtil.getTableName(SCHEMA2, "SLOW_VIEW");
private static volatile CountDownLatch latch1 = null;
private static volatile CountDownLatch latch2 = null;
- public ViewIT(boolean transactional) {
+ public ViewIT(boolean transactional, boolean columnEncoded) {
StringBuilder optionBuilder = new StringBuilder();
this.transactional = transactional;
+ this.columnEncoded = columnEncoded;
if (transactional) {
optionBuilder.append(" TRANSACTIONAL=true ");
}
+ if (!columnEncoded) {
+ if (optionBuilder.length()!=0)
+ optionBuilder.append(",");
+ optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+ }
this.tableDDLOptions = optionBuilder.toString();
}
- @Parameters(name = "transactional = {0}")
- public static Collection<Boolean> data() {
- return Arrays.asList(new Boolean[] { false, true });
+ @Parameters(name="ViewIT_transactional={0}, columnEncoded={1}") // name is used by failsafe as file name in reports
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { true, false }, { true, true },
+ { false, false }, { false, true }});
}
@BeforeClass
@@ -136,7 +137,9 @@ public class ViewIT extends SplitSystemCatalogIT {
Map<String, String> props = Collections.emptyMap();
boolean splitSystemCatalog = (driver == null);
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
- serverProps.put("hbase.coprocessor.region.classes", TestMetaDataRegionObserver.class.getName());
+ serverProps.put(QueryServices.PHOENIX_ACLS_ENABLED, "true");
+ serverProps.put(PhoenixMetaDataCoprocessorHost.PHOENIX_META_DATA_COPROCESSOR_CONF_KEY,
+ TestMetaDataRegionObserver.class.getName());
serverProps.put("hbase.coprocessor.abortonerror", "false");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(props.entrySet().iterator()));
// Split SYSTEM.CATALOG once after the mini-cluster is started
@@ -145,17 +148,36 @@ public class ViewIT extends SplitSystemCatalogIT {
}
}
- public static class TestMetaDataRegionObserver extends BaseRegionObserver {
+ public static class TestMetaDataRegionObserver extends BaseMetaDataEndpointObserver {
+
+ @Override
+ public void preAlterTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType type) throws IOException{
+ processTable(tableName);
+ }
+
+ @Override
+ public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
+ Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
+ processTable(tableName);
+ }
+
@Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- if (shouldFail(c, miniBatchOp.getOperation(0))) {
+ public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String tenantId,
+ String tableName, TableName physicalTableName, TableName parentPhysicalTableName, PTableType tableType,
+ List<PTable> indexes) throws IOException {
+ processTable(tableName);
+ }
+
+ private void processTable(String tableName) throws DoNotRetryIOException {
+ if (tableName.equals(FAILED_VIEWNAME)) {
// throwing anything other than instances of IOException result
// in this coprocessor being unloaded
// DoNotRetryIOException tells HBase not to retry this mutation
// multiple times
throw new DoNotRetryIOException();
- } else if (shouldSlowDown(c, miniBatchOp.getOperation(0))) {
+ } else if (tableName.startsWith(SLOW_VIEWNAME_PREFIX)) {
// simulate a slow write to SYSTEM.CATALOG
if (latch1 != null) {
latch1.countDown();
@@ -172,20 +194,7 @@ public class ViewIT extends SplitSystemCatalogIT {
}
}
}
-
- private boolean shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) {
- TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable();
- return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
- && (Bytes.equals(FAILED_ROWKEY_BYTES, m.getRow()));
- }
-
- private boolean shouldSlowDown(ObserverContext<RegionCoprocessorEnvironment> c,
- Mutation m) {
- TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable();
- byte[] rowKeyPrefix = Arrays.copyOf(m.getRow(), SLOW_ROWKEY_PREFIX_BYTES.length);
- return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
- && (Bytes.equals(SLOW_ROWKEY_PREFIX_BYTES, rowKeyPrefix));
- }
+
}
@Test
@@ -598,9 +607,6 @@ public class ViewIT extends SplitSystemCatalogIT {
public void testViewAndTableAndDropCascadeWithIndexes() throws Exception {
// Setup - Tables and Views with Indexes
Connection conn = DriverManager.getConnection(getUrl());
- if (tableDDLOptions.length()!=0)
- tableDDLOptions+=",";
- tableDDLOptions+="IMMUTABLE_ROWS=true";
String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
String ddl = "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + tableDDLOptions;
conn.createStatement().execute(ddl);
@@ -1311,7 +1317,7 @@ public class ViewIT extends SplitSystemCatalogIT {
public void testChildViewCreationFails() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
- String fullViewName1 = SchemaUtil.getTableName(SCHEMA2, FAILED_VIEWNAME);
+ String fullViewName1 = FAILED_VIEWNAME;
String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName());
String tableDdl = "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + tableDDLOptions;
@@ -1343,9 +1349,7 @@ public class ViewIT extends SplitSystemCatalogIT {
public void testConcurrentViewCreationAndTableDrop() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
- String fullViewName1 =
- SchemaUtil.getTableName(SCHEMA2,
- SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName());
+ String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName();
String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName());
latch1 = new CountDownLatch(1);
latch2 = new CountDownLatch(1);
@@ -1392,12 +1396,12 @@ public class ViewIT extends SplitSystemCatalogIT {
}
@Test
- public void testConcurrentAddColumn() throws Exception {
+ public void testConcurrentAddSameColumnDifferentType() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
+ latch1 = null;
+ latch2 = null;
String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
- String fullViewName1 =
- SchemaUtil.getTableName(SCHEMA2,
- SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName());
+ String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName();
String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName());
// create base table
String tableDdl =
@@ -1422,10 +1426,10 @@ public class ViewIT extends SplitSystemCatalogIT {
}
});
- // add a column to the view in a separate thread (which will take
- // some time to complete)
+ // add a column with the same name and different type to the view in a separate thread
+ // (which will take some time to complete)
Future<Exception> future = executorService.submit(new AddColumnRunnable(fullViewName1));
- // wait till the thread makes the rpc to create the view
+ // wait till the thread makes the rpc to add the column
boolean result = latch1.await(2, TimeUnit.MINUTES);
if (!result) {
fail("The create view rpc look too long");
@@ -1451,6 +1455,82 @@ public class ViewIT extends SplitSystemCatalogIT {
conn.createStatement().execute(tableDdl);
}
}
+
+ @Test
+ public void testConcurrentAddDifferentColumn() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ latch1 = null;
+ latch2 = null;
+ String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName());
+ String fullViewName1 = SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName();
+ String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName());
+ String fullViewName3 = SchemaUtil.getTableName(SCHEMA4, generateUniqueName());
+ // create base table
+ String tableDdl =
+ "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)"
+ + tableDDLOptions;
+ conn.createStatement().execute(tableDdl);
+ // create a two views
+ String ddl =
+ "CREATE VIEW " + fullViewName1 + " (v2 VARCHAR) AS SELECT * FROM "
+ + fullTableName + " WHERE k = 6";
+ conn.createStatement().execute(ddl);
+ ddl =
+ "CREATE VIEW " + fullViewName3 + " (v2 VARCHAR) AS SELECT * FROM "
+ + fullTableName + " WHERE k = 7";
+ conn.createStatement().execute(ddl);
+
+ latch1 = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+ ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ t.setPriority(Thread.MIN_PRIORITY);
+ return t;
+ }
+ });
+
+ // add a column to a view in a separate thread (we slow this operation down)
+ Future<Exception> future = executorService.submit(new AddColumnRunnable(fullViewName1));
+ // wait till the thread makes the rpc to add the column
+ boolean result = latch1.await(2, TimeUnit.MINUTES);
+ if (!result) {
+ fail("The alter view rpc look too long");
+ }
+ tableDdl = "ALTER VIEW " + fullViewName3 + " ADD v4 INTEGER";
+ try {
+ // add a column to another view
+ conn.createStatement().execute(tableDdl);
+ if (columnEncoded) {
+ // this should fail as the previous add column is still not complete
+ fail(
+ "Adding columns to two different views concurrently where the base table uses encoded column should fail");
+ }
+ } catch (ConcurrentTableMutationException e) {
+ if (!columnEncoded) {
+ // this should not fail as we don't need to update the parent table for non
+ // column encoded tables
+ fail(
+ "Adding columns to two different views concurrently where the base table does not use encoded columns should succeed");
+ }
+ }
+ latch2.countDown();
+
+ Exception e = future.get();
+ // if the base table uses column encoding then the add column operation for fullViewName1 fails
+ assertNull(e);
+
+ // add a the same column to the another view to ensure that the cell used
+ // to prevent concurrent modifications was removed
+ ddl = "CREATE VIEW " + fullViewName2 + " (v2 VARCHAR) AS SELECT * FROM "
+ + fullTableName + " WHERE k = 6";
+ conn.createStatement().execute(ddl);
+ tableDdl = "ALTER VIEW " + fullViewName2 + " ADD v3 INTEGER";
+ conn.createStatement().execute(tableDdl);
+ }
+ }
private class CreateViewRunnable implements Callable<Exception> {
private final String fullTableName;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0cc455c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5e8a5dc..e748115 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2208,8 +2208,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
MetaDataResponse response =
processRemoteRegionMutations(
PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES,
- childLinkMutations, fullTableName,
- MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK);
+ childLinkMutations, MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK);
if (response != null) {
done.run(response);
return;
@@ -2229,8 +2228,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
response =
processRemoteRegionMutations(
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
- remoteMutations, fullTableName,
- MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
+ remoteMutations, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
+ clearParentTableFromCache(clientTimeStamp,
+ parentTable.getSchemaName() != null
+ ? parentTable.getSchemaName().getBytes()
+ : ByteUtil.EMPTY_BYTE_ARRAY,
+ parentTable.getName().getBytes());
if (response != null) {
done.run(response);
return;
@@ -2484,8 +2487,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
MetaDataResponse response =
processRemoteRegionMutations(
PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES,
- childLinkMutations, SchemaUtil.getTableName(schemaName, tableName),
- MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK);
+ childLinkMutations, MetaDataProtos.MutationCode.UNABLE_TO_CREATE_CHILD_LINK);
if (response!=null) {
done.run(response);
return;
@@ -2521,8 +2523,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
private MetaDataResponse processRemoteRegionMutations(byte[] systemTableName,
- List<Mutation> remoteMutations, String tableName,
- MetaDataProtos.MutationCode mutationCode) throws IOException {
+ List<Mutation> remoteMutations, MetaDataProtos.MutationCode mutationCode) throws IOException {
MetaDataResponse.Builder builder = MetaDataResponse.newBuilder();
try (Table hTable =
env.getTable(
@@ -2780,7 +2781,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
.getEncodingScheme() != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS) {
processRemoteRegionMutations(
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, remoteMutations,
- fullTableName, MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
+ MetaDataProtos.MutationCode.UNABLE_TO_UPDATE_PARENT_TABLE);
+ clearParentTableFromCache(clientTimeStamp,
+ table.getParentSchemaName() != null
+ ? table.getParentSchemaName().getBytes()
+ : ByteUtil.EMPTY_BYTE_ARRAY,
+ table.getParentTableName().getBytes());
}
else {
String msg = "Found unexpected mutations while adding or dropping column to "+fullTableName;
@@ -2815,6 +2821,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return null; // impossible
}
}
+
+ /**
+ * Removes the table from the server side cache
+ */
+ private void clearParentTableFromCache(long clientTimeStamp, byte[] schemaName, byte[] tableName) throws SQLException {
+ // remove the parent table from the metadata cache as we just mutated the table
+ Properties props = new Properties();
+ if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
+ props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
+ }
+ try (PhoenixConnection connection =
+ QueryUtil.getConnectionOnServer(props, env.getConfiguration())
+ .unwrap(PhoenixConnection.class)) {
+ ConnectionQueryServices queryServices = connection.getQueryServices();
+ queryServices.clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, schemaName, tableName,
+ clientTimeStamp);
+ } catch (ClassNotFoundException e) {
+ }
+ }
private static boolean isDivergedView(PTable view) {
return view.getBaseColumnCount() == QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
@@ -3205,28 +3230,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
}
- if (type == PTableType.VIEW
- && EncodedColumnsUtil.usesEncodedColumnNames(table)) {
- /*
- * When adding a column to a view that uses encoded column name scheme, we
- * need to modify the CQ counters stored in the view's physical table. So to
- * make sure clients get the latest PTable, we need to invalidate the cache
- * entry.
- */
- invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
- .getPhysicalTableRowForView(table)));
-
-
-
- }
+ boolean addingCol = false;
for (Mutation m : tableMetaData) {
byte[] key = m.getRow();
boolean addingPKColumn = false;
int pkCount = getVarChars(key, rowKeyMetaData);
+ // this means we have are adding a column
if (pkCount > COLUMN_NAME_INDEX
&& Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0
&& Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0) {
try {
+ addingCol = true;
if (pkCount > FAMILY_NAME_INDEX
&& rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
PColumnFamily family =
@@ -3291,6 +3305,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
tableMetaData.addAll(additionalTableMetadataMutations);
+ if (type == PTableType.VIEW
+ && EncodedColumnsUtil.usesEncodedColumnNames(table) && addingCol
+ && !table.isAppendOnlySchema()) {
+ // When adding a column to a view that uses encoded column name
+ // scheme, we need to modify the CQ counters stored in the view's
+ // physical table. So to make sure clients get the latest PTable, we
+ // need to invalidate the cache entry.
+ // If the table uses APPEND_ONLY_SCHEMA we use the position of the
+ // column as the encoded column qualifier and so we don't need to
+ // update the CQ counter in the view physical table (see
+ // PHOENIX-4737)
+ invalidateList.add(new ImmutableBytesPtr(
+ MetaDataUtil.getPhysicalTableRowForView(table)));
+ }
return null;
}
}, request.getClientVersion());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0cc455c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
index 15b0020..059bca1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixMetaDataCoprocessorHost.java
@@ -104,7 +104,7 @@ public class PhoenixMetaDataCoprocessorHost
/**
* Encapsulation of the environment of each coprocessor
*/
- static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment
+ public static class PhoenixMetaDataControllerEnvironment extends CoprocessorHost.Environment
implements RegionCoprocessorEnvironment {
private RegionCoprocessorEnvironment env;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0cc455c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c714eab..1114463 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -3321,6 +3321,7 @@ public class MetaDataClient {
String physicalTableName =
SchemaUtil.getTableNameFromFullName(physicalName.getString());
Set<String> acquiredColumnMutexSet = Sets.newHashSetWithExpectedSize(3);
+ boolean acquiredMutex = false;
try {
connection.setAutoCommit(false);
@@ -3599,17 +3600,26 @@ public class MetaDataClient {
}
}
- boolean acquiredMutex = true;
- for (PColumn pColumn : columns) {
- // acquire the mutex using the global physical table name to
- // prevent creating the same column on a table or view with
- // a conflicting type etc
- acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
- pColumn.getName().getString());
+ if (EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+ // for tables that use column encoding acquire a mutex on the base table as we
+ // need to update the encoded column qualifier counter on the base table
+ acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName, null);
if (!acquiredMutex) {
throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
}
- acquiredColumnMutexSet.add(pColumn.getName().getString());
+ }
+ else {
+ for (PColumn pColumn : columns) {
+ // acquire the mutex using the global physical table name to
+ // prevent creating the same column on a table or view with
+ // a conflicting type etc
+ acquiredMutex = writeCell(null, physicalSchemaName, physicalTableName,
+ pColumn.getName().getString());
+ if (!acquiredMutex) {
+ throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
+ }
+ acquiredColumnMutexSet.add(pColumn.getName().getString());
+ }
}
MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table, properties, colFamiliesForPColumnsToBeAdded, columns);
try {
@@ -3681,7 +3691,12 @@ public class MetaDataClient {
}
} finally {
connection.setAutoCommit(wasAutoCommit);
- if (!acquiredColumnMutexSet.isEmpty()) {
+ if (EncodedColumnsUtil.usesEncodedColumnNames(table) && acquiredMutex) {
+ // release the mutex on the physical table (used to prevent concurrent conflicting
+ // add column changes)
+ deleteCell(null, physicalSchemaName, physicalTableName, null);
+ }
+ else if (!acquiredColumnMutexSet.isEmpty()) {
for (String columnName : acquiredColumnMutexSet) {
// release the mutex (used to prevent concurrent conflicting add column changes)
deleteCell(null, physicalSchemaName, physicalTableName, columnName);