You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/07 23:19:01 UTC
[phoenix] branch master updated: PHOENIX-4940 Add tenantId
parameter to index tool
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb4313 PHOENIX-4940 Add tenantId parameter to index tool
1bb4313 is described below
commit 1bb43138e47d8c5a72e12c21941b63908fa5069a
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Jan 31 11:04:03 2019 -0800
PHOENIX-4940 Add tenantId parameter to index tool
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 112 +++++++++++++++++++--
.../apache/phoenix/mapreduce/index/IndexTool.java | 44 +++++---
2 files changed, 138 insertions(+), 18 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index dfe4634..c1a455a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -72,13 +74,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
private final boolean directApi;
private final String tableDDLOptions;
private final boolean useSnapshot;
+ private final boolean useTenantId;
public IndexToolIT(String transactionProvider, boolean mutable, boolean localIndex,
- boolean directApi, boolean useSnapshot) {
+ boolean directApi, boolean useSnapshot, boolean useTenantId) {
this.localIndex = localIndex;
this.transactional = transactionProvider != null;
this.directApi = directApi;
this.useSnapshot = useSnapshot;
+ this.useTenantId = useTenantId;
StringBuilder optionBuilder = new StringBuilder();
if (!mutable) {
optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -124,13 +128,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
.isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
for (boolean directApi : Booleans) {
for (boolean useSnapshot : Booleans) {
- list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot });
+ list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot, false});
}
}
}
}
}
}
+ // Add the usetenantId
+ list.add(new Object[] { "", false, false, true, false, true});
return TestUtil.filterTxParamData(list,0);
}
@@ -229,6 +235,89 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testIndexToolWithTenantId() throws Exception {
+ if (!useTenantId) { return;}
+ String tenantId = generateUniqueName();
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String viewTenantName = generateUniqueName();
+ String indexNameGlobal = generateUniqueName();
+ String indexNameTenant = generateUniqueName();
+ String viewIndexTableName = "_IDX_" + dataTableName;
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection connGlobal = DriverManager.getConnection(getUrl(), props);
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ Connection connTenant = DriverManager.getConnection(getUrl(), props);
+ String createTableStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL, ID INTEGER NOT NULL, NAME VARCHAR, "
+ + "CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+ String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+
+ String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')";
+ String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+ try {
+ String tableStmtGlobal = String.format(createTableStr, dataTableName);
+ connGlobal.createStatement().execute(tableStmtGlobal);
+
+ String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+ connTenant.createStatement().execute(viewStmtTenant);
+
+ String idxStmtTenant = String.format(createIndexStr, indexNameTenant, viewTenantName);
+ connTenant.createStatement().execute(idxStmtTenant);
+
+ connTenant.createStatement()
+ .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x"));
+ connTenant.commit();
+
+ runIndexTool(true, false, "", viewTenantName, indexNameTenant, tenantId, 0,
+ new String[0]);
+
+ String selectSql = String.format("SELECT ID FROM %s WHERE NAME='x'", viewTenantName);
+ ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(false, actualExplainPlan, "", viewIndexTableName);
+ rs = connTenant.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ // Remove from tenant view index and build.
+ ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(viewIndexTableName);
+ admin.disableTable(tableName);
+ admin.truncateTable(tableName, false);
+
+ runIndexTool(true, false, "", viewTenantName, indexNameTenant, tenantId, 0,
+ new String[0]);
+ Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+ int count = getUtility().countRows(htable);
+ // Confirm index has rows
+ assertTrue(count == 1);
+
+ selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM %s", indexNameTenant, viewTenantName);
+ rs = connTenant.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ String idxStmtGlobal =
+ String.format(createIndexStr, indexNameGlobal, dataTableName);
+ connGlobal.createStatement().execute(idxStmtGlobal);
+
+ // run the index MR job this time with tenant id.
+ // We expect it to return -1 because indexTable is not correct for this tenant.
+ runIndexTool(true, false, schemaName, dataTableName, indexNameGlobal,
+ tenantId, -1, new String[0]);
+
+ } finally {
+ connGlobal.close();
+ connTenant.close();
+ }
+ }
+
+ @Test
public void testSaltedVariableLengthPK() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -332,7 +421,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute(indexDDL);
// run with 50% sampling rate, split if data table more than 3 regions
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, "-sp", "50", "-spa", "3");
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,"-sp", "50", "-spa", "3");
assertEquals(targetNumRegions, admin.getTableRegions(indexTN).size());
List<Cell> values = new ArrayList<>();
@@ -361,7 +450,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
- String dataTable, String indxTable) {
+ String dataTable, String indxTable, String tenantId) {
final List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -381,6 +470,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add("-snap");
}
+ if (tenantId != null) {
+ args.add("-tenant");
+ args.add(tenantId);
+ }
+
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args.toArray(new String[0]);
@@ -401,15 +495,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
+ }
+
+ public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTableName, String indexTableName, String tenantId, int expectedStatus,
+ String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
indexingTool.setConf(conf);
final String[] cmdArgs =
- getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
+ getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
- assertEquals(0, status);
+ assertEquals(expectedStatus, status);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a69fa47..1e62838 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -149,6 +149,8 @@ public class IndexTool extends Configured implements Tool {
"Output path where the files are written");
private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false,
"If specified, uses Snapshots for async index building (optional)");
+ private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+ "If specified, uses Tenant connection for tenant view index building (optional)");
private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
@@ -162,6 +164,7 @@ public class IndexTool extends Configured implements Tool {
options.addOption(RUN_FOREGROUND_OPTION);
options.addOption(OUTPUT_PATH_OPTION);
options.addOption(SNAPSHOT_OPTION);
+ options.addOption(TENANT_ID_OPTION);
options.addOption(HELP_OPTION);
AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
options.addOption(AUTO_SPLIT_INDEX_OPTION);
@@ -247,15 +250,15 @@ public class IndexTool extends Configured implements Tool {
}
public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild,
- boolean useSnapshot) throws Exception {
+ boolean useSnapshot, String tenantId) throws Exception {
if (isPartialBuild) {
- return configureJobForPartialBuild(schemaName, dataTable);
+ return configureJobForPartialBuild(schemaName, dataTable, tenantId);
} else {
- return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot);
+ return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId);
}
}
- private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception {
+ private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception {
final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
connection = ConnectionUtil.getInputConnection(configuration);
@@ -303,7 +306,10 @@ public class IndexTool extends Configured implements Tool {
ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
-
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
+
//Prepare raw scan
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
@@ -364,7 +370,7 @@ public class IndexTool extends Configured implements Tool {
}
- private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot)
+ private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId)
throws Exception {
final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
final String qIndexTable;
@@ -408,6 +414,9 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
indexColumns.toArray(new String[indexColumns.size()]));
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
final List<ColumnInfo> columnMetadataList =
PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
@@ -536,14 +545,20 @@ public class IndexTool extends Configured implements Tool {
String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
- connection = ConnectionUtil.getInputConnection(configuration);
+ boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
byte[][] splitKeysBeforeJob = null;
boolean isLocalIndexBuild = false;
PTable pindexTable = null;
+ String tenantId = null;
+ if (useTenantId) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ connection = ConnectionUtil.getInputConnection(configuration);
if (indexTable != null) {
- if (!isValidIndexTable(connection, qDataTable,indexTable)) {
+ if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
throw new IllegalArgumentException(String.format(
- " %s is not an index table for %s ", indexTable, qDataTable));
+ " %s is not an index table for %s for this connection", indexTable, qDataTable));
}
pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
@@ -581,7 +596,7 @@ public class IndexTool extends Configured implements Tool {
}
Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable,
- useDirectApi, isPartialBuild, useSnapshot);
+ useDirectApi, isPartialBuild, useSnapshot, tenantId);
if (!isForeground && useDirectApi) {
LOG.info("Running Index Build in Background - Submit async and exit");
job.submit();
@@ -742,18 +757,23 @@ public class IndexTool extends Configured implements Tool {
* @param connection
* @param masterTable
* @param indexTable
+ * @param tenantId
* @return
* @throws SQLException
*/
private boolean isValidIndexTable(final Connection connection, final String masterTable,
- final String indexTable) throws SQLException {
+ final String indexTable, final String tenantId) throws SQLException {
final DatabaseMetaData dbMetaData = connection.getMetaData();
final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
final String tableName = SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
ResultSet rs = null;
try {
- rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false);
+ String catalog = "";
+ if (tenantId != null) {
+ catalog = tenantId;
+ }
+ rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName, false, false);
while (rs.next()) {
final String indexName = rs.getString(6);
if (indexTable.equalsIgnoreCase(indexName)) {