You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/12/16 17:36:15 UTC
[phoenix] branch master updated: PHOENIX-5578 - CREATE TABLE IF NOT
EXISTS loads IndexRegionObserver on an existing table
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 12b0597 PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads IndexRegionObserver on an existing table
12b0597 is described below
commit 12b05970317082e978f2066050a2ca12778c4718
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Nov 21 16:52:39 2019 -0800
PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads IndexRegionObserver on an existing table
---
.../end2end/ParameterizedIndexUpgradeToolIT.java | 40 +++-
.../phoenix/end2end/index/IndexCoprocIT.java | 263 +++++++++++++++++++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 84 ++++---
3 files changed, 349 insertions(+), 38 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index 8a7315d..c983659 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.index.GlobalIndexChecker;
@@ -59,6 +60,7 @@ import java.util.UUID;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
+import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@RunWith(Parameterized.class)
@@ -139,6 +141,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
Boolean.toString(isNamespaceEnabled));
clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE,
Boolean.toString(isNamespaceEnabled));
+ clientProps.put(DROP_METADATA_ATTRIB, Boolean.toString(true));
serverProps.putAll(clientProps);
//To mimic the upgrade/rollback scenario, so that table creation uses old/new design
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
@@ -229,23 +232,28 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find IndexRegionObserver for " + table,
+ admin.getDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
- Assert.assertFalse(admin.getDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found Indexer on " + table,
+ admin.getDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertTrue(admin.getDescriptor(TableName.valueOf(index))
+ Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index,
+ admin.getDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
// Transactional indexes should not have new coprocessors
for (String index : TRANSACTIONAL_INDEXES_LIST) {
- Assert.assertFalse(admin.getDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on transactional index " + index,
+ admin.getDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
for (String table : TRANSACTIONAL_TABLE_LIST) {
- Assert.assertFalse(admin.getDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on transactional table",
+ admin.getDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
@@ -254,14 +262,17 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find Indexer for " + table,
+ admin.getDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
- Assert.assertFalse(admin.getDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on " + table,
+ admin.getDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertFalse(admin.getDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on " + index,
+ admin.getDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
}
@@ -335,7 +346,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
}
@After
- public void cleanup() throws SQLException {
+ public void cleanup() throws IOException, SQLException {
//TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3
conn.createStatement().execute("DROP INDEX INDEX1 ON TEST.MOCK1");
conn.createStatement().execute("DROP INDEX INDEX2 ON TEST.MOCK1");
@@ -371,5 +382,16 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
}
conn.close();
connTenant.close();
+ assertTableNotExists("TEST.MOCK1");
+ assertTableNotExists("TEST.MOCK2");
+ assertTableNotExists("TEST.MOCK3");
+ assertTableNotExists("TEST.MULTI_TENANT_TABLE");
+ }
+
+ private void assertTableNotExists(String table) throws IOException {
+ TableName tableName =
+ SchemaUtil.getPhysicalTableName(Bytes.toBytes(table), isNamespaceEnabled);
+ Assert.assertFalse("Table " + table + " exists when it shouldn't",
+ admin.tableExists(tableName));
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
new file mode 100644
index 0000000..becd0fc
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Parameterized.class)
+public class IndexCoprocIT extends ParallelStatsDisabledIT {
+ private boolean isNamespaceMapped = false;
+ private boolean isMultiTenant = false;
+
+ public IndexCoprocIT(boolean isMultiTenant){
+ this.isMultiTenant = isMultiTenant;
+ }
+ @Parameterized.Parameters(name ="CreateIndexCoprocIT_mulitTenant={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ @Test
+ public void testCreateCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+
+ TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ TableDescriptorBuilder baseDescBuilder = TableDescriptorBuilder.newBuilder(baseDescriptor);
+ TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ TableDescriptorBuilder indexDescBuilder = TableDescriptorBuilder.newBuilder(indexDescriptor);
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+
+ removeCoproc(IndexRegionObserver.class, baseDescBuilder, admin);
+ removeCoproc(IndexRegionObserver.class, indexDescBuilder, admin);
+ removeCoproc(GlobalIndexChecker.class, indexDescBuilder, admin);
+
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(baseDescBuilder, NonTxIndexBuilder.class,
+ props, 100);
+ admin.modifyTable(baseDescBuilder.build());
+ baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+
+ createBaseTable(schemaName, tableName, true, 0, null);
+ baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+ }
+
+ @Test
+ public void testCreateOnExistingHBaseTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ byte[] cf = Bytes.toBytes("f");
+ try (PhoenixConnection conn = getConnection()){
+ TableName table = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ tableName, isNamespaceMapped).getString());
+ TableDescriptorBuilder originalDescBuilder = TableDescriptorBuilder.newBuilder(table);
+ ColumnFamilyDescriptorBuilder familyDescBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(cf);
+ originalDescBuilder.setColumnFamily(familyDescBuilder.build());
+ Admin admin = conn.getQueryServices().getAdmin();
+ admin.createTable(originalDescBuilder.build());
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ TableDescriptor baseDescriptor = admin.getDescriptor(table);
+ assertUsingNewCoprocs(baseDescriptor);
+ createIndexTable(schemaName, tableName, indexName);
+ baseDescriptor = admin.getDescriptor(table);
+ TableName indexTable = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString());
+ TableDescriptor indexDescriptor = admin.getDescriptor(indexTable);
+ assertUsingNewCoprocs(baseDescriptor, indexDescriptor);
+ }
+ }
+
+ @Test
+ public void testAlterDoesntChangeCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+ TableDescriptor baseDescriptor = admin.getDescriptor(TableName.valueOf(physicalTableName));
+ TableDescriptor indexDescriptor = admin.getDescriptor(TableName.valueOf(physicalIndexName));
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ String columnName = "foo";
+ addColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ dropColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+ private void assertUsingOldCoprocs(TableDescriptor baseDescriptor,
+ TableDescriptor indexDescriptor) {
+ assertCoprocsContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
+ assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(TableDescriptor baseDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(TableDescriptor baseDescriptor,
+ TableDescriptor indexDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, indexDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertCoprocsContains(Class clazz, TableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertTrue("Could not find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private void assertCoprocsNotContains(Class clazz, TableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertFalse("Could find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private boolean isCoprocPresent(TableDescriptor descriptor, String expectedCoprocName) {
+ boolean foundCoproc = false;
+ for (String coprocName : descriptor.getCoprocessors()){
+ if (coprocName.equals(expectedCoprocName)){
+ foundCoproc = true;
+ break;
+ }
+ }
+ return foundCoproc;
+ }
+
+ private void removeCoproc(Class clazz, TableDescriptorBuilder descBuilder, Admin admin) throws Exception {
+ descBuilder.removeCoprocessor(clazz.getName());
+ admin.modifyTable(descBuilder.build());
+ }
+
+ private void createIndexTable(String schemaName, String tableName, String indexName)
+ throws SQLException {
+ Connection conn = getConnection();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
+ }
+
+ private void addColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " ADD " + columnName + " varchar(512)";
+ conn.createStatement().execute(ddl);
+ }
+
+ private void dropColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " DROP COLUMN " + columnName;
+ conn.createStatement().execute(ddl);
+ }
+
+ private void createBaseTable(String schemaName, String tableName, boolean multiTenant, Integer saltBuckets, String splits)
+ throws SQLException {
+ Connection conn = getConnection();
+ if (isNamespaceMapped) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ String ddl = "CREATE TABLE IF NOT EXISTS "
+ + SchemaUtil.getTableName(schemaName, tableName) + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 VARCHAR NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "v1 VARCHAR,\n" +
+ "v2 INTEGER,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+ String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
+ if (saltBuckets != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "salt_buckets=" + saltBuckets;
+ }
+ if (splits != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "splits=" + splits;
+ }
+ conn.createStatement().execute(ddl + ddlOptions);
+ conn.close();
+ }
+
+ private PhoenixConnection getConnection() throws SQLException{
+ Properties props = new Properties();
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+ return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f3f368e..0b6cb7b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -888,7 +888,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
- addCoprocessors(physicalTableName, tableDescriptorBuilder, tableType, tableProps);
+ addCoprocessors(physicalTableName, tableDescriptorBuilder,
+ tableType, tableProps, existingDesc);
// PHOENIX-3072: Set index priority if this is a system table or index table
if (tableType == PTableType.SYSTEM) {
@@ -914,7 +915,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
- private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
+ private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder,
+ PTableType tableType, Map<String,Object> tableProps,
+ TableDescriptor existingDesc) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
@@ -985,23 +988,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (newDesc.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
builder.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
}
- if (indexRegionObserverEnabled) {
- if (newDesc.hasCoprocessor(Indexer.class.getName())) {
- builder.removeCoprocessor(Indexer.class.getName());
- }
- if (!newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- IndexRegionObserver.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority);
- }
- } else {
- if (newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) {
- builder.removeCoprocessor(IndexRegionObserver.class.getName());
- }
- if (!newDesc.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority);
+ // we only want to mess with the indexing coprocs if we're on the original
+ // CREATE statement. Otherwise, if we're on an ALTER or CREATE TABLE
+ // IF NOT EXISTS of an existing table, we should leave them unaltered,
+ // because they should be upgraded or downgraded using the IndexUpgradeTool
+ if (!doesPhoenixTableAlreadyExist(existingDesc)) {
+ if (indexRegionObserverEnabled) {
+ if (newDesc.hasCoprocessor(Indexer.class.getName())) {
+ builder.removeCoprocessor(Indexer.class.getName());
+ }
+ if (!newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ IndexRegionObserver.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority);
+ }
+ } else {
+ if (newDesc.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ builder.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
+ if (!newDesc.hasCoprocessor(Indexer.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority);
+ }
}
}
}
@@ -1082,7 +1091,24 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
return provider;
}
-
+
+ private boolean doesPhoenixTableAlreadyExist(TableDescriptor existingDesc) {
+ //if the table descriptor already has Phoenix coprocs, we assume it's
+ //already gone through a Phoenix create statement once
+ if (existingDesc == null){
+ return false;
+ }
+ boolean hasScanObserver = existingDesc.hasCoprocessor(ScanRegionObserver.class.getName());
+ boolean hasUnAggObserver = existingDesc.hasCoprocessor(
+ UngroupedAggregateRegionObserver.class.getName());
+ boolean hasGroupedObserver = existingDesc.hasCoprocessor(
+ GroupedAggregateRegionObserver.class.getName());
+ boolean hasIndexObserver = existingDesc.hasCoprocessor(Indexer.class.getName())
+ || existingDesc.hasCoprocessor(IndexRegionObserver.class.getName())
+ || existingDesc.hasCoprocessor(GlobalIndexChecker.class.getName());
+ return hasScanObserver && hasUnAggObserver && hasGroupedObserver && hasIndexObserver;
+ }
+
private static interface RetriableOperation {
boolean checkForCompletion() throws TimeoutException, IOException;
String getOperationName();
@@ -1279,7 +1305,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
TableDescriptorBuilder newDesc = generateTableDescriptor(physicalTableName, existingDesc, tableType, props, families,
splits, isNamespaceMapped);
-
+
if (!tableExist) {
if (SchemaUtil.isSystemTable(physicalTableName) && !isUpgradeRequired() && (!isAutoUpgradeEnabled || isDoNotUpgradePropSet)) {
// Disallow creating the SYSTEM.CATALOG or SYSTEM:CATALOG HBase table
@@ -2130,7 +2156,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* If the table was transitioned from non transactional to transactional, we need
* to also transition the index tables.
*/
-
+
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableDescriptor);
if (nonTxToTx) {
updateDescriptorForTx(table, tableProps, tableDescriptorBuilder, Boolean.TRUE.toString(),
@@ -2382,7 +2408,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
tableDescriptorBuilder.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue);
}
- this.addCoprocessors(physicalTableName, tableDescriptorBuilder, tableType, tableProps);
+ this.addCoprocessors(physicalTableName, tableDescriptorBuilder, tableType, tableProps, null);
}
private Map<TableDescriptor, TableDescriptor> separateAndValidateProperties(PTable table,
@@ -3046,12 +3072,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
}
-
+
// Available for testing
protected String getChildLinkDDL() {
return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
}
-
+
protected String getMutexDDL() {
return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
}
@@ -3241,7 +3267,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
+
private boolean inspectIfAnyExceptionInChain(Throwable io, List<Class<? extends Exception>> ioList) {
boolean exceptionToIgnore = false;
for (Throwable t : Throwables.getCausalChain(io)) {
@@ -3353,12 +3379,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
for (TableDescriptor table : localIndexTables) {
if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null
&& table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) {
-
+
table=TableDescriptorBuilder.newBuilder(table).setValue(Bytes.toBytes(MetaDataUtil.PARENT_TABLE_KEY),
Bytes.toBytes(MetaDataUtil.getLocalIndexUserTableName(table.getTableName().getNameAsString()))).build();
// Explicitly disable, modify and enable the table to ensure
// co-location of data and index regions. If we just modify the
- // table descriptor when online schema change enabled may reopen
+ // table descriptor when online schema change enabled may reopen
// the region in same region server instead of following data region.
admin.disableTable(table.getTableName());
admin.modifyTable(table);
@@ -3949,7 +3975,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection.rollback();
PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
- SortOrder.ASC, null, null, false, null, false, false,
+ SortOrder.ASC, null, null, false, null, false, false,
Bytes.toBytes("COLUMN_QUALIFIER"), timestamp);
String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +