You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/12/13 19:14:03 UTC
[phoenix] 01/02: PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads
IndexRegionObserver on an existing table
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 28cbba9b652b7162646a9ca812cc56f052bd9d8b
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Nov 21 16:52:39 2019 -0800
PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads IndexRegionObserver on an existing table
---
.../end2end/ParameterizedIndexUpgradeToolIT.java | 57 ++---
.../phoenix/end2end/index/IndexCoprocIT.java | 259 +++++++++++++++++++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 65 ++++--
3 files changed, 335 insertions(+), 46 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index 1b164a4..3bf69a1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -18,9 +18,9 @@
package org.apache.phoenix.end2end;
import com.google.common.collect.Maps;
-import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.index.GlobalIndexChecker;
@@ -59,6 +59,7 @@ import java.util.UUID;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
+import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@RunWith(Parameterized.class)
@@ -139,6 +140,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
Boolean.toString(isNamespaceEnabled));
clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE,
Boolean.toString(isNamespaceEnabled));
+ clientProps.put(DROP_METADATA_ATTRIB, Boolean.toString(true));
serverProps.putAll(clientProps);
//To mimic the upgrade/rollback scenario, so that table creation uses old/new design
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
@@ -228,23 +230,28 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find IndexRegionObserver for " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found Indexer on " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
// Transactional indexes should not have new coprocessors
for (String index : TRANSACTIONAL_INDEXES_LIST) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on transactional index " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
for (String table : TRANSACTIONAL_TABLE_LIST) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on transactional table",
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
@@ -253,14 +260,17 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find Indexer for " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
}
@@ -333,24 +343,8 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
validate(true);
}
- @Test
- public void testCommandLineParsing() {
-
- String outputFile = "/tmp/index_upgrade_" + UUID.randomUUID().toString();
- String [] args = {"-o", upgrade ? UPGRADE_OP : ROLLBACK_OP, "-tb",
- INPUT_LIST, "-lf", outputFile, "-d"};
- IndexUpgradeTool iut = new IndexUpgradeTool();
-
- CommandLine cmd = iut.parseOptions(args);
- iut.initializeTool(cmd);
- Assert.assertEquals(iut.getDryRun(),true);
- Assert.assertEquals(iut.getInputTables(), INPUT_LIST);
- Assert.assertEquals(iut.getOperation(), upgrade ? UPGRADE_OP : ROLLBACK_OP);
- Assert.assertEquals(iut.getLogFile(), outputFile);
- }
-
@After
- public void cleanup() throws SQLException {
+ public void cleanup() throws IOException, SQLException {
//TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3
conn.createStatement().execute("DROP INDEX INDEX1 ON TEST.MOCK1");
conn.createStatement().execute("DROP INDEX INDEX2 ON TEST.MOCK1");
@@ -386,5 +380,16 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
}
conn.close();
connTenant.close();
+ assertTableNotExists("TEST.MOCK1");
+ assertTableNotExists("TEST.MOCK2");
+ assertTableNotExists("TEST.MOCK3");
+ assertTableNotExists("TEST.MULTI_TENANT_TABLE");
+ }
+
+ private void assertTableNotExists(String table) throws IOException {
+ TableName tableName =
+ SchemaUtil.getPhysicalTableName(Bytes.toBytes(table), isNamespaceEnabled);
+ Assert.assertFalse("Table " + table + " exists when it shouldn't",
+ admin.tableExists(tableName));
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
new file mode 100644
index 0000000..c6b4e7b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Parameterized.class)
+public class IndexCoprocIT extends ParallelStatsDisabledIT {
+ private boolean isNamespaceMapped = false;
+ private boolean isMultiTenant = false;
+
+ public IndexCoprocIT(boolean isMultiTenant){
+ this.isMultiTenant = isMultiTenant;
+ }
+ @Parameterized.Parameters(name ="CreateIndexCoprocIT_mulitTenant={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ @Test
+ public void testCreateCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+
+ removeCoproc(IndexRegionObserver.class, baseDescriptor, admin);
+ removeCoproc(IndexRegionObserver.class, indexDescriptor, admin);
+ removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin);
+
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(baseDescriptor, NonTxIndexBuilder.class,
+ props, 100);
+ admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor);
+ baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+
+ createBaseTable(schemaName, tableName, true, 0, null);
+ baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+ }
+
+ @Test
+ public void testCreateOnExistingHBaseTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ byte[] cf = Bytes.toBytes("f");
+ try (PhoenixConnection conn = getConnection()){
+ TableName table = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ tableName, isNamespaceMapped).getString());
+ HTableDescriptor originalDesc = new HTableDescriptor(table);
+ originalDesc.addFamily(new HColumnDescriptor(cf));
+ Admin admin = conn.getQueryServices().getAdmin();
+ admin.createTable(originalDesc);
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(table);
+ assertUsingNewCoprocs(baseDescriptor);
+ createIndexTable(schemaName, tableName, indexName);
+ baseDescriptor = admin.getTableDescriptor(table);
+ TableName indexTable = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString());
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(indexTable);
+ assertUsingNewCoprocs(baseDescriptor, indexDescriptor);
+ }
+ }
+
+ @Test
+ public void testAlterDoesntChangeCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ String columnName = "foo";
+ addColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ dropColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+ private void assertUsingOldCoprocs(HTableDescriptor baseDescriptor,
+ HTableDescriptor indexDescriptor) {
+ assertCoprocsContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
+ assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor,
+ HTableDescriptor indexDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, indexDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertTrue("Could not find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertFalse("Could find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) {
+ boolean foundCoproc = false;
+ for (String coprocName : descriptor.getCoprocessors()){
+ if (coprocName.equals(expectedCoprocName)){
+ foundCoproc = true;
+ break;
+ }
+ }
+ return foundCoproc;
+ }
+
+ private void removeCoproc(Class clazz, HTableDescriptor descriptor, Admin admin) throws Exception {
+ descriptor.removeCoprocessor(clazz.getName());
+ admin.modifyTable(descriptor.getTableName(), descriptor);
+ }
+
+ private void createIndexTable(String schemaName, String tableName, String indexName)
+ throws SQLException {
+ Connection conn = getConnection();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
+ }
+
+ private void addColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " ADD " + columnName + " varchar(512)";
+ conn.createStatement().execute(ddl);
+ }
+
+ private void dropColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " DROP COLUMN " + columnName;
+ conn.createStatement().execute(ddl);
+ }
+
+ private void createBaseTable(String schemaName, String tableName, boolean multiTenant, Integer saltBuckets, String splits)
+ throws SQLException {
+ Connection conn = getConnection();
+ if (isNamespaceMapped) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ String ddl = "CREATE TABLE IF NOT EXISTS "
+ + SchemaUtil.getTableName(schemaName, tableName) + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 VARCHAR NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "v1 VARCHAR,\n" +
+ "v2 INTEGER,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+ String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
+ if (saltBuckets != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "salt_buckets=" + saltBuckets;
+ }
+ if (splits != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "splits=" + splits;
+ }
+ conn.createStatement().execute(ddl + ddlOptions);
+ conn.close();
+ }
+
+ private PhoenixConnection getConnection() throws SQLException{
+ Properties props = new Properties();
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+ return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index aa2c1df..fac17ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -823,7 +823,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
- addCoprocessors(physicalTableName, tableDescriptor, tableType, tableProps);
+ addCoprocessors(physicalTableName, tableDescriptor, tableType, tableProps, existingDesc);
// PHOENIX-3072: Set index priority if this is a system table or index table
if (tableType == PTableType.SYSTEM) {
@@ -849,7 +849,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
- private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
+ private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor,
+ PTableType tableType, Map<String,Object> tableProps,
+ HTableDescriptor existingDesc) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
@@ -920,23 +922,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
}
- if (indexRegionObserverEnabled) {
- if (descriptor.hasCoprocessor(Indexer.class.getName())) {
- descriptor.removeCoprocessor(Indexer.class.getName());
- }
- if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
- }
- } else {
- if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
- descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
- }
- if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ // we only want to mess with the indexing coprocs if we're on the original
+ // CREATE statement. Otherwise, if we're on an ALTER or CREATE TABLE
+ // IF NOT EXISTS of an existing table, we should leave them unaltered,
+ // because they should be upgraded or downgraded using the IndexUpgradeTool
+ if (!doesPhoenixTableAlreadyExist(existingDesc)) {
+ if (indexRegionObserverEnabled) {
+ if (descriptor.hasCoprocessor(Indexer.class.getName())) {
+ descriptor.removeCoprocessor(Indexer.class.getName());
+ }
+ if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
+ } else {
+ if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
+ if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
}
}
}
@@ -999,6 +1007,23 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ private boolean doesPhoenixTableAlreadyExist(HTableDescriptor existingDesc) {
+ //if the table descriptor already has Phoenix coprocs, we assume it's
+ //already gone through a Phoenix create statement once
+ if (existingDesc == null){
+ return false;
+ }
+ boolean hasScanObserver = existingDesc.hasCoprocessor(ScanRegionObserver.class.getName());
+ boolean hasUnAggObserver = existingDesc.hasCoprocessor(
+ UngroupedAggregateRegionObserver.class.getName());
+ boolean hasGroupedObserver = existingDesc.hasCoprocessor(
+ GroupedAggregateRegionObserver.class.getName());
+ boolean hasIndexObserver = existingDesc.hasCoprocessor(Indexer.class.getName())
+ || existingDesc.hasCoprocessor(IndexRegionObserver.class.getName())
+ || existingDesc.hasCoprocessor(GlobalIndexChecker.class.getName());
+ return hasScanObserver && hasUnAggObserver && hasGroupedObserver && hasIndexObserver;
+ }
+
private static interface RetriableOperation {
boolean checkForCompletion() throws TimeoutException, IOException;
String getOperationName();
@@ -2119,7 +2144,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue);
}
- this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
+ this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps, tableDescriptor);
}
private Pair<HTableDescriptor,HTableDescriptor> separateAndValidateProperties(PTable table, Map<String, List<Pair<String, Object>>> properties,