You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:34:42 UTC
[14/40] phoenix git commit: PHOENIX-3757 System mutex table not being
created in SYSTEM namespace when namespace mapping is enabled
PHOENIX-3757 System mutex table not being created in SYSTEM namespace when namespace mapping is enabled
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7111d31e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7111d31e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7111d31e
Branch: refs/heads/4.x-HBase-1.2
Commit: 7111d31e036b8c6e419212695810e62864c19a3b
Parents: a8a1abc
Author: Karan Mehta <ka...@gmail.com>
Authored: Thu Oct 26 11:32:14 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:02:13 2017 -0800
----------------------------------------------------------------------
.../MigrateSystemTablesToSystemNamespaceIT.java | 402 +++++++++++++++++++
.../end2end/SystemTablePermissionsIT.java | 3 +-
.../phoenix/coprocessor/MetaDataProtocol.java | 3 +
.../exception/UpgradeInProgressException.java | 8 +-
.../query/ConnectionQueryServicesImpl.java | 184 ++++++---
.../org/apache/phoenix/util/UpgradeUtil.java | 44 +-
.../query/ConnectionQueryServicesImplTest.java | 9 +-
7 files changed, 572 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
new file mode 100644
index 0000000..91e34be
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.UpgradeInProgressException;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class MigrateSystemTablesToSystemNamespaceIT extends BaseTest {
+
+ private static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
+ "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION",
+ "SYSTEM.MUTEX"));
+ private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(
+ Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION",
+ "SYSTEM:MUTEX"));
+ private static final String SCHEMA_NAME = "MIGRATETEST";
+ private static final String TABLE_NAME =
+ SCHEMA_NAME + "." + MigrateSystemTablesToSystemNamespaceIT.class.getSimpleName().toUpperCase();
+ private static final int NUM_RECORDS = 5;
+
+ private HBaseTestingUtility testUtil = null;
+ private Set<String> hbaseTables;
+
+ // Create Multiple users since Phoenix caches the connection per user
+ // Migration or upgrade code will run every time for each user.
+ final UserGroupInformation user1 =
+ UserGroupInformation.createUserForTesting("user1", new String[0]);
+ final UserGroupInformation user2 =
+ UserGroupInformation.createUserForTesting("user2", new String[0]);
+ final UserGroupInformation user3 =
+ UserGroupInformation.createUserForTesting("user3", new String[0]);
+ final UserGroupInformation user4 =
+ UserGroupInformation.createUserForTesting("user4", new String[0]);
+
+
+ @Before
+ public final void doSetup() throws Exception {
+ testUtil = new HBaseTestingUtility();
+ Configuration conf = testUtil.getConfiguration();
+ enableNamespacesOnServer(conf);
+ testUtil.startMiniCluster(1);
+ }
+
+ @After
+ public void tearDownMiniCluster() {
+ try {
+ if (testUtil != null) {
+ testUtil.shutdownMiniCluster();
+ testUtil = null;
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
+ // Tests that client can create and read tables on a fresh HBase cluster with
+ // system namespace mapping enabled from the start
+ @Test
+ public void freshClientsCreateNamespaceMappedSystemTables() throws IOException, InterruptedException {
+
+ user1.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ createTable(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+
+ user1.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ readTable(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+
+ }
+
+ // Tests that NEWER clients can read tables on HBase cluster after system tables are migrated
+ @Test
+ public void migrateSystemTablesInExistingCluster() throws IOException, InterruptedException {
+
+ user1.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingDisabled());
+ createTable(getClientPropertiesWithSystemMappingDisabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.containsAll(PHOENIX_SYSTEM_TABLES));
+
+ user2.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ readTable(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+ }
+
+ // Tests that OLDER clients fail after system tables are migrated
+ // Clients should be restarted with new properties which are consistent on both client and server
+ @Test
+ public void oldClientsAfterSystemTableMigrationShouldFail() throws IOException, InterruptedException {
+
+ user1.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+
+ try {
+ user2.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingDisabled());
+ return null;
+ }
+ });
+ fail("Client should not be able to connect to cluster with inconsistent SYSTEM table namespace properties");
+ } catch (Exception e) {
+ //ignore
+ }
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+ }
+
+ // Tests that only one client can migrate the system table to system namespace
+ // Migrate process acquires lock in SYSMUTEX table
+ @Test
+ public void onlyOneClientCanMigrate() throws IOException, InterruptedException, SQLException {
+
+ user1.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingDisabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_SYSTEM_TABLES));
+
+ user2.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Acquire Mutex Lock
+ changeMutexLock(getClientPropertiesWithSystemMappingDisabled(), true);
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_SYSTEM_TABLES));
+
+ try {
+ user3.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+ fail("Multiple clients should not be able to migrate simultaneously.");
+ } catch (Exception e) {
+ if(!(e.getCause() instanceof UpgradeInProgressException)) {
+ fail("UpgradeInProgressException expected since the user is trying to migrate when SYSMUTEX is locked.");
+ }
+ }
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_SYSTEM_TABLES));
+
+ user2.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // Release Mutex Lock
+ changeMutexLock(getClientPropertiesWithSystemMappingDisabled(), false);
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_SYSTEM_TABLES));
+
+ user3.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ createConnection(getClientPropertiesWithSystemMappingEnabled());
+ return null;
+ }
+ });
+
+ hbaseTables = getHBaseTables();
+ assertTrue(hbaseTables.size() == PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES.size());
+ assertTrue(hbaseTables.containsAll(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES));
+ }
+
+ private void changeMutexLock(Properties clientProps, boolean acquire) throws SQLException, IOException {
+ ConnectionQueryServices services = null;
+ byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProps)) {
+ services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ if(acquire) {
+ assertTrue(((ConnectionQueryServicesImpl) services)
+ .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey));
+ } else {
+ ((ConnectionQueryServicesImpl) services).releaseUpgradeMutex(mutexRowKey);
+ }
+ }
+ }
+
+ private void enableNamespacesOnServer(Configuration conf) {
+ conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+ }
+
+ private Properties getClientPropertiesWithSystemMappingEnabled() {
+ Properties clientProps = new Properties();
+ clientProps.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+ clientProps.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.TRUE.toString());
+ return clientProps;
+ }
+
+ private Properties getClientPropertiesWithSystemMappingDisabled() {
+ Properties clientProps = new Properties();
+ clientProps.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+ clientProps.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.FALSE.toString());
+ return clientProps;
+ }
+
+ private Set<String> getHBaseTables() throws IOException {
+ Set<String> tables = new HashSet<>();
+ for (TableName tn : testUtil.getHBaseAdmin().listTableNames()) {
+ tables.add(tn.getNameAsString());
+ }
+ return tables;
+ }
+
+ private void createConnection(Properties clientProps) throws SQLException, IOException {
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProps);
+ Statement stmt = conn.createStatement();) {
+ verifySyscatData(clientProps, conn.toString(), stmt);
+ }
+ }
+
+ private void createTable(Properties clientProps) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProps);
+ Statement stmt = conn.createStatement();) {
+ assertFalse(stmt.execute("DROP TABLE IF EXISTS " + TABLE_NAME));
+ stmt.execute("CREATE SCHEMA " + SCHEMA_NAME);
+ assertFalse(stmt.execute("CREATE TABLE " + TABLE_NAME
+ + "(pk INTEGER not null primary key, data VARCHAR)"));
+ try (PreparedStatement pstmt = conn.prepareStatement("UPSERT INTO "
+ + TABLE_NAME + " values(?, ?)")) {
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ pstmt.setInt(1, i);
+ pstmt.setString(2, Integer.toString(i));
+ assertEquals(1, pstmt.executeUpdate());
+ }
+ }
+ conn.commit();
+ }
+ }
+
+ private void readTable(Properties clientProps) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(getJdbcUrl(), clientProps);
+ Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("SELECT pk, data FROM " + TABLE_NAME);
+ assertNotNull(rs);
+ int i = 0;
+ while (rs.next()) {
+ assertEquals(i, rs.getInt(1));
+ assertEquals(Integer.toString(i), rs.getString(2));
+ i++;
+ }
+ assertEquals(NUM_RECORDS, i);
+ }
+ }
+
+ private void verifySyscatData(Properties clientProps, String connName, Statement stmt) throws SQLException {
+ ResultSet rs = stmt.executeQuery("SELECT * FROM SYSTEM.CATALOG");
+
+ ReadOnlyProps props = new ReadOnlyProps((Map)clientProps);
+ boolean systemTablesMapped = SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props);
+ boolean systemSchemaExists = false;
+ Set<String> namespaceMappedSystemTablesSet = new HashSet<>(PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES);
+ Set<String> systemTablesSet = new HashSet<>(PHOENIX_SYSTEM_TABLES);
+
+ while(rs.next()) {
+
+ if(rs.getString("IS_NAMESPACE_MAPPED") == null) {
+ systemSchemaExists = rs.getString("TABLE_SCHEM").equals(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME) ? true : systemSchemaExists;
+ } else if (rs.getString("COLUMN_NAME") == null) {
+ String schemaName = rs.getString("TABLE_SCHEM");
+ String tableName = rs.getString("TABLE_NAME");
+
+ if(schemaName.equals(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME)) {
+ if (systemTablesMapped) {
+ namespaceMappedSystemTablesSet.remove(String.valueOf
+ (TableName.valueOf(schemaName + QueryConstants.NAMESPACE_SEPARATOR + tableName)));
+ assertTrue(rs.getString("IS_NAMESPACE_MAPPED").equals(Boolean.TRUE.toString()));
+ } else {
+ systemTablesSet.remove(String.valueOf
+ (TableName.valueOf(schemaName + QueryConstants.NAME_SEPARATOR + tableName)));
+ assertTrue(rs.getString("IS_NAMESPACE_MAPPED").equals(Boolean.FALSE.toString()));
+ }
+ }
+ }
+ }
+
+ if(!systemSchemaExists) {
+ fail(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME + " entry doesn't exist in SYSTEM.CATALOG table.");
+ }
+
+ // The set will contain SYSMUTEX table since that table is not exposed in SYSCAT
+ if (systemTablesMapped) {
+ assertTrue(namespaceMappedSystemTablesSet.size() == 1);
+ } else {
+ assertTrue(systemTablesSet.size() == 1);
+ }
+ }
+
+ private String getJdbcUrl() {
+ return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
index 166b135..49202a4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablePermissionsIT.java
@@ -57,10 +57,9 @@ public class SystemTablePermissionsIT {
private static final Set<String> PHOENIX_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
"SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", "SYSTEM.FUNCTION",
"SYSTEM.MUTEX"));
- // PHOENIX-XXXX SYSTEM.MUTEX isn't being created in the SYSTEM namespace as it should be.
private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(
Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION",
- "SYSTEM.MUTEX"));
+ "SYSTEM:MUTEX"));
private static final String TABLE_NAME =
SystemTablePermissionsIT.class.getSimpleName().toUpperCase();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 09abde4..655068d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -68,6 +68,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
public static final long MIN_TABLE_TIMESTAMP = 0;
+ public static final long MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP = 0;
+ public static final String MIGRATION_IN_PROGRESS = "MigrationInProgress";
public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
public static final boolean DEFAULT_META_DATA_KEEP_DELETED_CELLS = true;
@@ -95,6 +97,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
// Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.
private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = new TreeMap<>();
static {
+ TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, MIGRATION_IN_PROGRESS);
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0, "4.1.x");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0, "4.2.0");
TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1, "4.2.1");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java
index 08ae304..9c9f2a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/UpgradeInProgressException.java
@@ -18,10 +18,14 @@
package org.apache.phoenix.exception;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+
public class UpgradeInProgressException extends RetriableUpgradeException {
public UpgradeInProgressException(String upgradeFrom, String upgradeTo) {
- super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo
+ super((upgradeFrom.equals(MetaDataProtocol.MIGRATION_IN_PROGRESS) ?
+ "System Tables are concurrently being migrated to system namespace" :
+ "Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo)
+ ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS
.getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1a1e571..03cb7de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -190,7 +190,6 @@ import org.apache.phoenix.schema.EmptySequenceCacheException;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.MetaDataSplitPolicy;
-import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
@@ -334,7 +333,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return hbaseVersion >= PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION;
}
});
-
+
private PMetaData newEmptyMetaData() {
return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()));
}
@@ -821,7 +820,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return false;
}
-
+
private void addCoprocessors(byte[] tableName, HTableDescriptor descriptor, PTableType tableType, Map<String,Object> tableProps) throws SQLException {
// The phoenix jar must be available on HBase classpath
int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
@@ -1616,7 +1615,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private void dropTable(byte[] tableNameToDelete) throws SQLException {
dropTables(Collections.<byte[]>singletonList(tableNameToDelete));
}
-
+
private void dropTables(final List<byte[]> tableNamesToDelete) throws SQLException {
SQLException sqlE = null;
try (HBaseAdmin admin = getAdmin()) {
@@ -2393,26 +2392,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
//check if the server is already updated and have namespace config properly set.
checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES);
}
- ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps());
- } else if (mappedSystemCatalogExists) { throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
- .setMessage("Cannot initiate connection as "
- + SchemaUtil.getPhysicalTableName(
- SYSTEM_CATALOG_NAME_BYTES, true)
- + " is found but client does not have "
- + IS_NAMESPACE_MAPPING_ENABLED + " enabled")
- .build().buildException(); }
- createSysMutexTable(admin);
+
+ // If SYSTEM tables exist, they are migrated to HBase SYSTEM namespace
+ // If they don't exist, this method will create HBase SYSTEM namespace and return
+ ensureSystemTablesMigratedToSystemNamespace(ConnectionQueryServicesImpl.this.getProps());
+ } else if (mappedSystemCatalogExists) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.INCONSISTENT_NAMESPACE_MAPPING_PROPERTIES)
+ .setMessage("Cannot initiate connection as "
+ + SchemaUtil.getPhysicalTableName(
+ SYSTEM_CATALOG_NAME_BYTES, true)
+ + " is found but client does not have "
+ + IS_NAMESPACE_MAPPING_ENABLED + " enabled")
+ .build().buildException();
+ }
}
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
- try (PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
- scnProps, newEmptyMetaData())) {
+ try (HBaseAdmin hBaseAdmin = getAdmin();
+ PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl,
+ scnProps, newEmptyMetaData())) {
try {
- metaConnection.setRunningUpgrade(true);
+ metaConnection.setRunningUpgrade(true);
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
@@ -2434,8 +2438,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
return null;
}
+
+ // HBase Namespace SYSTEM is created by {@link ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps)} method
+ // This statement will create its entry in SYSCAT table, so that GRANT/REVOKE commands can work
+ // with SYSTEM Namespace. (See PHOENIX-4227 https://issues.apache.org/jira/browse/PHOENIX-4227)
+ if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
+ ConnectionQueryServicesImpl.this.getProps())) {
+ metaConnection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
+ }
if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
- createOtherSystemTables(metaConnection);
+ createOtherSystemTables(metaConnection, hBaseAdmin);
} else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) {
upgradeSystemTables(url, props);
}
@@ -2448,7 +2461,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
throw e;
} catch (Exception e) {
if (e instanceof SQLException) {
- initializationException = (SQLException)e;
+ initializationException = (SQLException) e;
} else {
// wrap every other exception into a SQLException
initializationException = new SQLException(e);
@@ -2467,13 +2480,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
} finally {
try {
- if (initializationException != null) { throw initializationException; }
+ if (initializationException != null) {
+ throw initializationException;
+ }
} finally {
initialized = true;
}
}
}
- }
+ }
return null;
}
});
@@ -2482,11 +2497,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
Throwables.propagate(e);
}
}
-
- private void createSysMutexTable(HBaseAdmin admin) throws IOException, SQLException {
+
+ void createSysMutexTable(HBaseAdmin admin, ReadOnlyProps props) throws IOException, SQLException {
try {
- final TableName mutexTableName = TableName.valueOf(
- PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES);
+ final TableName mutexTableName = SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props);
List<TableName> systemTables = getSystemTableNames(admin);
if (systemTables.contains(mutexTableName)) {
logger.debug("System mutex table already appears to exist, not creating it");
@@ -2498,7 +2513,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time
tableDesc.addFamily(columnDesc);
admin.createTable(tableDesc);
- try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+ try (HTableInterface sysMutexTable = getTable(mutexTableName.getName())) {
byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
Put put = new Put(mutexRowKey);
@@ -2514,7 +2529,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return Lists.newArrayList(admin.listTableNames(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*"));
}
- private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQLException {
+ private void createOtherSystemTables(PhoenixConnection metaConnection, HBaseAdmin hbaseAdmin) throws SQLException, IOException {
try {
metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA);
} catch (TableAlreadyExistsException e) {
@@ -2526,8 +2541,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA);
} catch (TableAlreadyExistsException ignore) {}
+
+ // Catch the IOException to log the error message and then bubble it up for the client to retry.
+ try {
+ createSysMutexTable(hbaseAdmin, ConnectionQueryServicesImpl.this.getProps());
+ } catch (IOException exception) {
+ logger.error("Failed to created SYSMUTEX table. Upgrade or migration is not possible without it. Please retry.");
+ throw exception;
+ }
}
-
+
/**
* There is no other locking needed here since only one connection (on the same or different JVM) will be able to
* acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}.
@@ -2833,7 +2856,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection.createStatement().executeUpdate(
QueryConstants.CREATE_STATS_TABLE_METADATA);
} catch (NewerTableAlreadyExistsException ignore) {
-
+
} catch (TableAlreadyExistsException e) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) {
@@ -2861,14 +2884,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
} catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {}
- if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM,
- ConnectionQueryServicesImpl.this.getProps())) {
- try {
- metaConnection.createStatement().executeUpdate(
- "CREATE SCHEMA IF NOT EXISTS "
- + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA);
- } catch (NewerSchemaAlreadyExistsException e) {}
- }
ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
success = true;
} catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
@@ -2905,14 +2920,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
} finally {
if (acquiredMutexLock) {
- releaseUpgradeMutex(mutexRowKey);
+ try {
+ releaseUpgradeMutex(mutexRowKey);
+ } catch (IOException e) {
+ logger.warn("Release of upgrade mutex failed ", e);
+ }
}
}
if (toThrow != null) { throw toThrow; }
}
}
}
-
+
// Special method for adding the column qualifier column for 4.10.
private PhoenixConnection addColumnQualifierColumn(PhoenixConnection oldMetaConnection, Long timestamp) throws SQLException {
Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
@@ -2935,7 +2954,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
metaConnection.rollback();
PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
- SortOrder.ASC, null, null, false, null, false, false,
+ SortOrder.ASC, null, null, false, null, false, false,
Bytes.toBytes("COLUMN_QUALIFIER"));
String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
@@ -3086,12 +3105,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- void ensureSystemTablesUpgraded(ReadOnlyProps props)
+ void ensureSystemTablesMigratedToSystemNamespace(ReadOnlyProps props)
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; }
+
+ boolean acquiredMutexLock = false;
+ byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
+
HTableInterface metatable = null;
try (HBaseAdmin admin = getAdmin()) {
- // Namespace-mapping is enabled at this point.
+ // SYSTEM namespace needs to be created via HBase API's because "CREATE SCHEMA" statement tries to write its metadata
+ // in SYSTEM:CATALOG table. Without SYSTEM namespace, SYSTEM:CATALOG table cannot be created.
try {
ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME);
} catch (PhoenixIOException e) {
@@ -3101,7 +3126,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
// Regardless of the case 1 or 2, if the NS does not exist, we will error expectedly
// below. If the NS does exist and is mapped, the below check will exit gracefully.
}
-
+
List<TableName> tableNames = getSystemTableNames(admin);
// No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*"
if (tableNames.size() == 0) { return; }
@@ -3109,41 +3134,64 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (tableNames.size() > 5) {
logger.warn("Expected 5 system tables but found " + tableNames.size() + ":" + tableNames);
}
+
+ // Try acquiring a lock in SYSMUTEX table before migrating the tables since it involves disabling the table
+ // If we cannot acquire lock, it means some old client is either migrating SYSCAT or trying to upgrade the
+ // schema of SYSCAT table and hence it should not be interrupted
+ acquiredMutexLock = acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey);
+ if(acquiredMutexLock) {
+ logger.debug("Acquired lock in SYSMUTEX table for migrating SYSTEM tables to SYSTEM namespace");
+ }
+ // We will not reach here if we fail to acquire the lock, since it throws UpgradeInProgressException
+
+ // Handle the upgrade of SYSMUTEX table separately since it doesn't have any entries in SYSCAT
+ logger.info("Migrating SYSTEM.MUTEX table to SYSTEM namespace.");
+ String sysMutexSrcTableName = PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME;
+ String sysMutexDestTableName = SchemaUtil.getPhysicalName(sysMutexSrcTableName.getBytes(), props).getNameAsString();
+ UpgradeUtil.mapTableToNamespace(admin, sysMutexSrcTableName, sysMutexDestTableName, PTableType.SYSTEM);
+ tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME);
+
byte[] mappedSystemTable = SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName();
metatable = getTable(mappedSystemTable);
if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)) {
if (!admin.tableExists(mappedSystemTable)) {
+ logger.info("Migrating SYSTEM.CATALOG table to SYSTEM namespace.");
+ // Actual migration of SYSCAT table
UpgradeUtil.mapTableToNamespace(admin, metatable,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM,
null);
+ // Invalidate the client-side metadataCache
ConnectionQueryServicesImpl.this.removeTable(null,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
}
tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME);
}
- tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME);
for (TableName table : tableNames) {
+ logger.info(String.format("Migrating %s table to SYSTEM namespace.", table.getNameAsString()));
UpgradeUtil.mapTableToNamespace(admin, metatable, table.getNameAsString(), props, null, PTableType.SYSTEM,
null);
ConnectionQueryServicesImpl.this.removeTable(null, table.getNameAsString(), null,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0);
}
- if (!tableNames.isEmpty()) {
- clearCache();
- }
+
+ // Clear the server-side metadataCache when all tables are migrated so that the new PTable can be loaded with NS mapping
+ clearCache();
} finally {
if (metatable != null) {
metatable.close();
}
+ if(acquiredMutexLock) {
+ releaseUpgradeMutex(mutexRowKey);
+ }
}
}
-
+
/**
* Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by
* making use of HBase's checkAndPut api.
- *
+ *
* @return true if client won the race, false otherwise
* @throws IOException
* @throws SQLException
@@ -3152,7 +3200,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException,
SQLException {
Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
- try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+
+ byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes();
+ if(sysMutexPhysicalTableNameBytes == null) {
+ throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
+ getVersion(MIN_SYSTEM_TABLE_TIMESTAMP));
+ }
+
+ try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
byte[] oldValue = UPGRADE_MUTEX_UNLOCKED;
@@ -3177,11 +3232,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return true;
}
}
-
+
@VisibleForTesting
- public boolean releaseUpgradeMutex(byte[] mutexRowKey) {
+ public boolean releaseUpgradeMutex(byte[] mutexRowKey) throws IOException, SQLException {
boolean released = false;
- try (HTableInterface sysMutexTable = getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
+
+ byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes();
+ if(sysMutexPhysicalTableNameBytes == null) {
+ // We shouldn't never be really in this situation where neither SYSMUTEX or SYS:MUTEX exists
+ return true;
+ }
+
+ try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
byte[] expectedValue = UPGRADE_MUTEX_LOCKED;
@@ -3195,6 +3257,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return released;
}
+ private byte[] getSysMutexPhysicalTableNameBytes() throws IOException, SQLException {
+ byte[] sysMutexPhysicalTableNameBytes = null;
+ try(HBaseAdmin admin = getAdmin()) {
+ if(admin.tableExists(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME)) {
+ sysMutexPhysicalTableNameBytes = PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES;
+ } else if (admin.tableExists(TableName.valueOf(
+ SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props).getName()))) {
+ sysMutexPhysicalTableNameBytes = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME, props).getName();
+ }
+ }
+ return sysMutexPhysicalTableNameBytes;
+ }
+
private String addColumn(String columnsToAddSoFar, String columns) {
if (columnsToAddSoFar == null || columnsToAddSoFar.isEmpty()) {
return columns;
@@ -3662,6 +3737,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SQLException sqlE = null;
HTableInterface htable = this.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, this.getProps()).getName());
+
try {
htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
new Batch.Call<MetaDataService, ClearTableFromCacheResponse>() {
@@ -4041,7 +4117,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private void waitForRandomDuration() throws InterruptedException {
new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), MILLISECONDS);
}
-
+
private static class InternalRenewLeaseTaskException extends Exception {
public InternalRenewLeaseTaskException(String msg) {
super(msg);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index c06912d..f5825b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -1735,26 +1735,7 @@ public class UpgradeUtil {
? "For system table " + QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE
+ " also needs to be enabled along with " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
: QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " is not enabled"); }
- boolean srcTableExists=admin.tableExists(srcTableName);
- // we need to move physical table in actual namespace for TABLE and Index
- if (srcTableExists && (PTableType.TABLE.equals(pTableType)
- || PTableType.INDEX.equals(pTableType) || PTableType.SYSTEM.equals(pTableType))) {
- boolean destTableExists=admin.tableExists(destTableName);
- if (!destTableExists) {
- String snapshotName = QueryConstants.UPGRADE_TABLE_SNAPSHOT_PREFIX + srcTableName;
- logger.info("Disabling table " + srcTableName + " ..");
- admin.disableTable(srcTableName);
- logger.info(String.format("Taking snapshot %s of table %s..", snapshotName, srcTableName));
- admin.snapshot(snapshotName, srcTableName);
- logger.info(
- String.format("Restoring snapshot %s in destination table %s..", snapshotName, destTableName));
- admin.cloneSnapshot(Bytes.toBytes(snapshotName), Bytes.toBytes(destTableName));
- logger.info(String.format("deleting old table %s..", srcTableName));
- admin.deleteTable(srcTableName);
- logger.info(String.format("deleting snapshot %s..", snapshotName));
- admin.deleteSnapshot(snapshotName);
- }
- }
+ mapTableToNamespace(admin, srcTableName, destTableName, pTableType);
byte[] tableKey = SchemaUtil.getTableKey(tenantId != null ? tenantId.getString() : null,
SchemaUtil.getSchemaNameFromFullName(phoenixTableName),
@@ -1778,6 +1759,29 @@ public class UpgradeUtil {
}
}
+ public static void mapTableToNamespace(HBaseAdmin admin, String srcTableName, String destTableName, PTableType pTableType) throws IOException {
+ boolean srcTableExists=admin.tableExists(srcTableName);
+ // we need to move physical table in actual namespace for TABLE and Index
+ if (srcTableExists && (PTableType.TABLE.equals(pTableType)
+ || PTableType.INDEX.equals(pTableType) || PTableType.SYSTEM.equals(pTableType))) {
+ boolean destTableExists=admin.tableExists(destTableName);
+ if (!destTableExists) {
+ String snapshotName = QueryConstants.UPGRADE_TABLE_SNAPSHOT_PREFIX + srcTableName;
+ logger.info("Disabling table " + srcTableName + " ..");
+ admin.disableTable(srcTableName);
+ logger.info(String.format("Taking snapshot %s of table %s..", snapshotName, srcTableName));
+ admin.snapshot(snapshotName, srcTableName);
+ logger.info(
+ String.format("Restoring snapshot %s in destination table %s..", snapshotName, destTableName));
+ admin.cloneSnapshot(Bytes.toBytes(snapshotName), Bytes.toBytes(destTableName));
+ logger.info(String.format("deleting old table %s..", srcTableName));
+ admin.deleteTable(srcTableName);
+ logger.info(String.format("deleting snapshot %s..", snapshotName));
+ admin.deleteSnapshot(snapshotName);
+ }
+ }
+ }
+
/*
* Method to map existing phoenix table to a namespace. Should not be use if tables has views and indexes ,instead
* use map table utility in psql.py
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7111d31e/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 73ddd2d..4708ffb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -46,7 +47,9 @@ public class ConnectionQueryServicesImplTest {
ConnectionQueryServicesImpl cqs = mock(ConnectionQueryServicesImpl.class);
// Invoke the real methods for these two calls
when(cqs.createSchema(any(List.class), anyString())).thenCallRealMethod();
- doCallRealMethod().when(cqs).ensureSystemTablesUpgraded(any(ReadOnlyProps.class));
+ doCallRealMethod().when(cqs).ensureSystemTablesMigratedToSystemNamespace(any(ReadOnlyProps.class));
+ // Do nothing for this method, just check that it was invoked later
+ doNothing().when(cqs).createSysMutexTable(any(HBaseAdmin.class), any(ReadOnlyProps.class));
// Spoof out this call so that ensureSystemTablesUpgrade() will return-fast.
when(cqs.getSystemTableNames(any(HBaseAdmin.class))).thenReturn(Collections.<TableName> emptyList());
@@ -54,10 +57,10 @@ public class ConnectionQueryServicesImplTest {
// Throw a special exception to check on later
doThrow(PHOENIX_IO_EXCEPTION).when(cqs).ensureNamespaceCreated(anyString());
- // Make sure that ensureSystemTablesUpgraded will try to migrate the system tables.
+ // Make sure that ensureSystemTablesMigratedToSystemNamespace will try to migrate the system tables.
Map<String,String> props = new HashMap<>();
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
- cqs.ensureSystemTablesUpgraded(new ReadOnlyProps(props));
+ cqs.ensureSystemTablesMigratedToSystemNamespace(new ReadOnlyProps(props));
// Should be called after upgradeSystemTables()
// Proves that execution proceeded