You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/12/13 19:08:05 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5578 - CREATE TABLE
IF NOT EXISTS loads IndexRegionObserver on an existing table
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new 6b05c71 PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads IndexRegionObserver on an existing table
6b05c71 is described below
commit 6b05c71a2ad67fa6b4c4655cc984421247646e4f
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Nov 21 16:52:39 2019 -0800
PHOENIX-5578 - CREATE TABLE IF NOT EXISTS loads IndexRegionObserver on an existing table
---
.../end2end/ParameterizedIndexUpgradeToolIT.java | 40 +++-
.../phoenix/end2end/index/IndexCoprocIT.java | 259 +++++++++++++++++++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 79 ++++---
3 files changed, 342 insertions(+), 36 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
index 79d6247..1df46a8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.index.GlobalIndexChecker;
@@ -58,6 +59,7 @@ import java.util.UUID;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.UPGRADE_OP;
+import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@RunWith(Parameterized.class)
@@ -138,6 +140,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
Boolean.toString(isNamespaceEnabled));
clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE,
Boolean.toString(isNamespaceEnabled));
+ clientProps.put(DROP_METADATA_ATTRIB, Boolean.toString(true));
serverProps.putAll(clientProps);
//To mimic the upgrade/rollback scenario, so that table creation uses old/new design
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
@@ -227,23 +230,28 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find IndexRegionObserver for " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found Indexer on " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
// Transactional indexes should not have new coprocessors
for (String index : TRANSACTIONAL_INDEXES_LIST) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on transactional index " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
for (String table : TRANSACTIONAL_TABLE_LIST) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on transactional table",
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
@@ -252,14 +260,17 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
throws IOException {
if (mutable) {
for (String table : tableList) {
- Assert.assertTrue(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertTrue("Can't find Indexer for " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(Indexer.class.getName()));
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(table))
+ Assert.assertFalse("Found IndexRegionObserver on " + table,
+ admin.getTableDescriptor(TableName.valueOf(table))
.hasCoprocessor(IndexRegionObserver.class.getName()));
}
}
for (String index : indexList) {
- Assert.assertFalse(admin.getTableDescriptor(TableName.valueOf(index))
+ Assert.assertFalse("Found GlobalIndexChecker on " + index,
+ admin.getTableDescriptor(TableName.valueOf(index))
.hasCoprocessor(GlobalIndexChecker.class.getName()));
}
}
@@ -333,7 +344,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
}
@After
- public void cleanup() throws SQLException {
+ public void cleanup() throws IOException, SQLException {
//TEST.MOCK1,TEST1.MOCK2,TEST.MOCK3
conn.createStatement().execute("DROP INDEX INDEX1 ON TEST.MOCK1");
conn.createStatement().execute("DROP INDEX INDEX2 ON TEST.MOCK1");
@@ -369,5 +380,16 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest {
}
conn.close();
connTenant.close();
+ assertTableNotExists("TEST.MOCK1");
+ assertTableNotExists("TEST.MOCK2");
+ assertTableNotExists("TEST.MOCK3");
+ assertTableNotExists("TEST.MULTI_TENANT_TABLE");
+ }
+
+ private void assertTableNotExists(String table) throws IOException {
+ TableName tableName =
+ SchemaUtil.getPhysicalTableName(Bytes.toBytes(table), isNamespaceEnabled);
+ Assert.assertFalse("Table " + table + " exists when it shouldn't",
+ admin.tableExists(tableName));
}
}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
new file mode 100644
index 0000000..cdf45e2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Parameterized.class)
+public class IndexCoprocIT extends ParallelStatsDisabledIT {
+ private boolean isNamespaceMapped = false;
+ private boolean isMultiTenant = false;
+
+ public IndexCoprocIT(boolean isMultiTenant){
+ this.isMultiTenant = isMultiTenant;
+ }
+ @Parameterized.Parameters(name ="CreateIndexCoprocIT_mulitTenant={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][]{{true}, {false}});
+ }
+
+ @Test
+ public void testCreateCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+
+ removeCoproc(IndexRegionObserver.class, baseDescriptor, admin);
+ removeCoproc(IndexRegionObserver.class, indexDescriptor, admin);
+ removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin);
+
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(baseDescriptor, NonTxIndexBuilder.class,
+ props, 100);
+ admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor);
+ baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+
+ createBaseTable(schemaName, tableName, true, 0, null);
+ baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+ assertUsingOldCoprocs(baseDescriptor, indexDescriptor);
+ }
+
+ @Test
+ public void testCreateOnExistingHBaseTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ byte[] cf = Bytes.toBytes("f");
+ try (PhoenixConnection conn = getConnection()){
+ TableName table = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ tableName, isNamespaceMapped).getString());
+ HTableDescriptor originalDesc = new HTableDescriptor(table);
+ originalDesc.addFamily(new HColumnDescriptor(cf));
+ Admin admin = conn.getQueryServices().getAdmin();
+ admin.createTable(originalDesc);
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(table);
+ assertUsingNewCoprocs(baseDescriptor);
+ createIndexTable(schemaName, tableName, indexName);
+ baseDescriptor = admin.getTableDescriptor(table);
+ TableName indexTable = TableName.valueOf(SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString());
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(indexTable);
+ assertUsingNewCoprocs(baseDescriptor, indexDescriptor);
+ }
+ }
+
+ @Test
+ public void testAlterDoesntChangeCoprocs() throws Exception {
+ String schemaName = "S" + generateUniqueName();
+ String tableName = "T_" + generateUniqueName();
+ String indexName = "I_" + generateUniqueName();
+ String physicalTableName = SchemaUtil.getPhysicalHBaseTableName(schemaName, tableName,
+ isNamespaceMapped).getString();
+ String physicalIndexName = SchemaUtil.getPhysicalHBaseTableName(schemaName,
+ indexName, isNamespaceMapped).getString();
+ Admin admin = ((PhoenixConnection) getConnection()).getQueryServices().getAdmin();
+
+ createBaseTable(schemaName, tableName, isMultiTenant, 0, null);
+ createIndexTable(schemaName, tableName, indexName);
+ HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName));
+ HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName));
+
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ String columnName = "foo";
+ addColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ dropColumnToBaseTable(schemaName, tableName, columnName);
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+ private void assertUsingOldCoprocs(HTableDescriptor baseDescriptor,
+ HTableDescriptor indexDescriptor) {
+ assertCoprocsContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor);
+ assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ }
+
+ private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor,
+ HTableDescriptor indexDescriptor) {
+ assertCoprocsContains(IndexRegionObserver.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, baseDescriptor);
+ assertCoprocsNotContains(Indexer.class, indexDescriptor);
+ assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor);
+ }
+
+ private void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertTrue("Could not find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) {
+ String expectedCoprocName = clazz.getName();
+ boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName);
+ Assert.assertFalse("Could find coproc " + expectedCoprocName +
+ " in descriptor " + descriptor,foundCoproc);
+ }
+
+ private boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) {
+ boolean foundCoproc = false;
+ for (String coprocName : descriptor.getCoprocessors()){
+ if (coprocName.equals(expectedCoprocName)){
+ foundCoproc = true;
+ break;
+ }
+ }
+ return foundCoproc;
+ }
+
+ private void removeCoproc(Class clazz, HTableDescriptor descriptor, Admin admin) throws Exception {
+ descriptor.removeCoprocessor(clazz.getName());
+ admin.modifyTable(descriptor.getTableName(), descriptor);
+ }
+
+ private void createIndexTable(String schemaName, String tableName, String indexName)
+ throws SQLException {
+ Connection conn = getConnection();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
+ }
+
+ private void addColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " ADD " + columnName + " varchar(512)";
+ conn.createStatement().execute(ddl);
+ }
+
+ private void dropColumnToBaseTable(String schemaName, String tableName, String columnName) throws Exception{
+ Connection conn = getConnection();
+ String ddl = "ALTER TABLE " + SchemaUtil.getTableName(schemaName, tableName) + " " +
+ " DROP COLUMN " + columnName;
+ conn.createStatement().execute(ddl);
+ }
+
+ private void createBaseTable(String schemaName, String tableName, boolean multiTenant, Integer saltBuckets, String splits)
+ throws SQLException {
+ Connection conn = getConnection();
+ if (isNamespaceMapped) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ String ddl = "CREATE TABLE IF NOT EXISTS "
+ + SchemaUtil.getTableName(schemaName, tableName) + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 VARCHAR NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "v1 VARCHAR,\n" +
+ "v2 INTEGER,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+ String ddlOptions = multiTenant ? "MULTI_TENANT=true" : "";
+ if (saltBuckets != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "salt_buckets=" + saltBuckets;
+ }
+ if (splits != null) {
+ ddlOptions = ddlOptions
+ + (ddlOptions.isEmpty() ? "" : ",")
+ + "splits=" + splits;
+ }
+ conn.createStatement().execute(ddl + ddlOptions);
+ conn.close();
+ }
+
+ private PhoenixConnection getConnection() throws SQLException{
+ Properties props = new Properties();
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+ return (PhoenixConnection) DriverManager.getConnection(getUrl(),props);
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b845156..959be3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -883,7 +883,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
- addCoprocessors(physicalTableName, newTableDescriptor, tableType, tableProps);
+ addCoprocessors(physicalTableName, newTableDescriptor, tableType, tableProps, existingDesc);
// PHOENIX-3072: Set index priority if this is a system table or index table
if (tableType == PTableType.SYSTEM) {
@@ -909,7 +909,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
- private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
+ private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor,
+ PTableType tableType, Map<String,Object> tableProps,
+ HTableDescriptor existingDesc) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
try {
@@ -978,23 +980,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
}
- if (indexRegionObserverEnabled) {
- if (descriptor.hasCoprocessor(Indexer.class.getName())) {
- descriptor.removeCoprocessor(Indexer.class.getName());
- }
- if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
- }
- } else {
- if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
- descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
- }
- if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
- Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
- opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
- Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ // we only want to mess with the indexing coprocs if we're on the original
+ // CREATE statement. Otherwise, if we're on an ALTER or CREATE TABLE
+ // IF NOT EXISTS of an existing table, we should leave them unaltered,
+ // because they should be upgraded or downgraded using the IndexUpgradeTool
+ if (!doesPhoenixTableAlreadyExist(existingDesc)) {
+ if (indexRegionObserverEnabled) {
+ if (descriptor.hasCoprocessor(Indexer.class.getName())) {
+ descriptor.removeCoprocessor(Indexer.class.getName());
+ }
+ if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
+ } else {
+ if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+ descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
+ }
+ if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+ Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+ opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+ Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+ }
}
}
}
@@ -1075,7 +1083,24 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
return provider;
}
-
+
+ private boolean doesPhoenixTableAlreadyExist(HTableDescriptor existingDesc) {
+ //if the table descriptor already has Phoenix coprocs, we assume it's
+ //already gone through a Phoenix create statement once
+ if (existingDesc == null){
+ return false;
+ }
+ boolean hasScanObserver = existingDesc.hasCoprocessor(ScanRegionObserver.class.getName());
+ boolean hasUnAggObserver = existingDesc.hasCoprocessor(
+ UngroupedAggregateRegionObserver.class.getName());
+ boolean hasGroupedObserver = existingDesc.hasCoprocessor(
+ GroupedAggregateRegionObserver.class.getName());
+ boolean hasIndexObserver = existingDesc.hasCoprocessor(Indexer.class.getName())
+ || existingDesc.hasCoprocessor(IndexRegionObserver.class.getName())
+ || existingDesc.hasCoprocessor(GlobalIndexChecker.class.getName());
+ return hasScanObserver && hasUnAggObserver && hasGroupedObserver && hasIndexObserver;
+ }
+
private static interface RetriableOperation {
boolean checkForCompletion() throws TimeoutException, IOException;
String getOperationName();
@@ -1398,7 +1423,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
return null; // will never make it here
}
-
+
private static boolean hasTxCoprocessor(HTableDescriptor descriptor) {
for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor();
@@ -2365,7 +2390,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} else {
tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue);
}
- this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
+ this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps, tableDescriptor);
}
private Map<HTableDescriptor, HTableDescriptor> separateAndValidateProperties(PTable table,
@@ -2986,7 +3011,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
}
-
+
// Available for testing
protected void setUpgradeRequired() {
this.upgradeRequired.set(true);
@@ -3026,12 +3051,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return String.format(ddl, props.getInt(LOG_SALT_BUCKETS_ATTRIB, QueryServicesOptions.DEFAULT_LOG_SALT_BUCKETS));
}
-
+
// Available for testing
protected String getChildLinkDDL() {
return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA);
}
-
+
protected String getMutexDDL() {
return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA);
}
@@ -3219,7 +3244,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
}
-
+
private boolean inspectIfAnyExceptionInChain(Throwable io, List<Class<? extends Exception>> ioList) {
boolean exceptionToIgnore = false;
for (Throwable t : Throwables.getCausalChain(io)) {
@@ -3924,7 +3949,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection.rollback();
PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
- SortOrder.ASC, null, null, false, null, false, false,
+ SortOrder.ASC, null, null, false, null, false, false,
Bytes.toBytes("COLUMN_QUALIFIER"), timestamp);
String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +