You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/10/26 20:25:52 UTC
[phoenix] branch 4.x updated: PHOENIX-6129 : Optimize tableExists()
call while retrieving correct MUTEX table
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 1872f5f PHOENIX-6129 : Optimize tableExists() call while retrieving correct MUTEX table
1872f5f is described below
commit 1872f5f2dc531d543ca6a9c111bb95966af6c6f8
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Tue Oct 27 01:27:29 2020 +0530
PHOENIX-6129 : Optimize tableExists() call while retrieving correct MUTEX table
Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
.../phoenix/query/ConnectionQueryServicesImpl.java | 52 +++++++++-------------
.../query/ConnectionQueryServicesImplTest.java | 43 ++++++++++++++++++
2 files changed, 64 insertions(+), 31 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 8fe3eec..62825d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4335,19 +4335,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
* making use of HBase's checkAndPut api.
*
* @return true if client won the race, false otherwise
- * @throws IOException
* @throws SQLException
*/
@VisibleForTesting
public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp)
- throws IOException,
- SQLException {
+ throws SQLException {
Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP);
- byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes();
- if(sysMutexPhysicalTableNameBytes == null) {
- throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
- getVersion(MIN_SYSTEM_TABLE_TIMESTAMP));
- }
if (!writeMutexCell(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, null, null)) {
throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp),
@@ -4360,15 +4353,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public boolean writeMutexCell(String tenantId, String schemaName, String tableName,
String columnName, String familyName) throws SQLException {
try {
- byte[] rowKey =
- columnName != null
- ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName, columnName,
- familyName)
- : SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ byte[] rowKey = columnName != null
+ ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName,
+ columnName, familyName)
+ : SchemaUtil.getTableKey(tenantId, schemaName, tableName);
// at this point the system mutex table should have been created or
// an exception thrown
- byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes();
- try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) {
+ try (Table sysMutexTable = getSysMutexTable()) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
byte[] value = MUTEX_LOCKED;
@@ -4404,15 +4395,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
public void deleteMutexCell(String tenantId, String schemaName, String tableName,
String columnName, String familyName) throws SQLException {
try {
- byte[] rowKey =
- columnName != null
- ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName, columnName,
- familyName)
- : SchemaUtil.getTableKey(tenantId, schemaName, tableName);
+ byte[] rowKey = columnName != null
+ ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName,
+ columnName, familyName)
+ : SchemaUtil.getTableKey(tenantId, schemaName, tableName);
// at this point the system mutex table should have been created or
// an exception thrown
- byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes();
- try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) {
+ try (Table sysMutexTable = getSysMutexTable()) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = PhoenixDatabaseMetaData.SYSTEM_MUTEX_COLUMN_NAME_BYTES;
Delete delete = new Delete(rowKey);
@@ -4430,17 +4419,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
- private byte[] getSysMutexPhysicalTableNameBytes() throws IOException, SQLException {
- byte[] sysMutexPhysicalTableNameBytes = null;
- try(HBaseAdmin admin = getAdmin()) {
- if(admin.tableExists(PhoenixDatabaseMetaData.SYSTEM_MUTEX_HBASE_TABLE_NAME)) {
- sysMutexPhysicalTableNameBytes = PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES;
- } else if (admin.tableExists(TableName.valueOf(
- SchemaUtil.getPhysicalTableName(SYSTEM_MUTEX_NAME, props).getName()))) {
- sysMutexPhysicalTableNameBytes = SchemaUtil.getPhysicalTableName(SYSTEM_MUTEX_NAME, props).getName();
+ @VisibleForTesting
+ public Table getSysMutexTable() throws SQLException, IOException {
+ String table = SYSTEM_MUTEX_NAME;
+ TableName tableName = TableName.valueOf(table);
+ try (HBaseAdmin admin = getAdmin()) {
+ if (!admin.tableExists(tableName)) {
+ table = table.replace(QueryConstants.NAME_SEPARATOR,
+ QueryConstants.NAMESPACE_SEPARATOR);
+ tableName = TableName.valueOf(table);
}
+ return connection.getTable(tableName);
}
- return sysMutexPhysicalTableNameBytes;
}
private String addColumn(String columnsToAddSoFar, String columns) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
index 62299e1..da16dfe 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionQueryServicesImplTest.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -41,12 +42,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.PhoenixIOException;
@@ -75,6 +79,15 @@ public class ConnectionQueryServicesImplTest {
@Mock
private ReadOnlyProps readOnlyProps;
+ @Mock
+ private ClusterConnection mockConn;
+
+ @Mock
+ private HTableInterface mockTable;
+
+ @Mock
+ private Configuration mockConf;
+
public static final HTableDescriptor SYS_TASK_TDB =
new HTableDescriptor(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME));
public static final HTableDescriptor SYS_TASK_TDB_SP =
@@ -89,12 +102,16 @@ public class ConnectionQueryServicesImplTest {
.getDeclaredField("props");
props.setAccessible(true);
props.set(mockCqs, readOnlyProps);
+ props = ConnectionQueryServicesImpl.class.getDeclaredField("connection");
+ props.setAccessible(true);
+ props.set(mockCqs, mockConn);
when(mockCqs.checkIfSysMutexExistsAndModifyTTLIfRequired(mockAdmin))
.thenCallRealMethod();
when(mockCqs.updateAndConfirmSplitPolicyForTask(SYS_TASK_TDB))
.thenCallRealMethod();
when(mockCqs.updateAndConfirmSplitPolicyForTask(SYS_TASK_TDB_SP))
.thenCallRealMethod();
+ when(mockCqs.getSysMutexTable()).thenCallRealMethod();
}
@SuppressWarnings("unchecked")
@@ -189,4 +206,30 @@ public class ConnectionQueryServicesImplTest {
e.getMessage());
}
}
+
+ @Test
+ public void testGetSysMutexTableWithName() throws Exception {
+ when(mockAdmin.tableExists(any(TableName.class))).thenReturn(true);
+ when(mockConn.getTable(TableName.valueOf("SYSTEM.MUTEX")))
+ .thenReturn(mockTable);
+ when(mockCqs.getAdmin()).thenReturn(mockAdmin);
+ assertSame(mockCqs.getSysMutexTable(), mockTable);
+ verify(mockAdmin, Mockito.times(1)).tableExists(any(TableName.class));
+ verify(mockConn, Mockito.times(1))
+ .getTable(TableName.valueOf("SYSTEM.MUTEX"));
+ verify(mockCqs, Mockito.times(1)).getAdmin();
+ }
+
+ @Test
+ public void testGetSysMutexTableWithNamespace() throws Exception {
+ when(mockAdmin.tableExists(any(TableName.class))).thenReturn(false);
+ when(mockConn.getTable(TableName.valueOf("SYSTEM:MUTEX")))
+ .thenReturn(mockTable);
+ when(mockCqs.getAdmin()).thenReturn(mockAdmin);
+ assertSame(mockCqs.getSysMutexTable(), mockTable);
+ verify(mockAdmin, Mockito.times(1)).tableExists(any(TableName.class));
+ verify(mockConn, Mockito.times(1))
+ .getTable(TableName.valueOf("SYSTEM:MUTEX"));
+ verify(mockCqs, Mockito.times(1)).getAdmin();
+ }
}