You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/02/11 22:28:02 UTC

[phoenix] branch 4.x-HBase-1.4 updated: Add tenantId param to IndexTool

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 06b6d79  Add tenantId param to IndexTool
06b6d79 is described below

commit 06b6d79d4b4b541f2b13e03bd860090135b975f5
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Mon Feb 11 12:58:53 2019 -0800

    Add tenantId param to IndexTool
    
    Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 114 ++++++++++++++++++++-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  43 +++++---
 2 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index e096bb5..c185f39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -40,17 +40,21 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -73,13 +77,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     private final boolean directApi;
     private final String tableDDLOptions;
     private final boolean useSnapshot;
+    private final boolean useTenantId;
 
     public IndexToolIT(String transactionProvider, boolean mutable, boolean localIndex,
-            boolean directApi, boolean useSnapshot) {
+            boolean directApi, boolean useSnapshot, boolean useTenantId) {
         this.localIndex = localIndex;
         this.transactional = transactionProvider != null;
         this.directApi = directApi;
         this.useSnapshot = useSnapshot;
+        this.useTenantId = useTenantId;
         StringBuilder optionBuilder = new StringBuilder();
         if (!mutable) {
             optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -125,13 +131,16 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                                 .isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
                         for (boolean directApi : Booleans) {
                             for (boolean useSnapshot : Booleans) {
-                                list.add(new Object[] { transactionProvider, mutable, localIndex, directApi, useSnapshot });
+                                list.add(new Object[] { transactionProvider, mutable, localIndex,
+                                        directApi, useSnapshot, false});
                             }
                         }
                     }
                 }
             }
         }
+        // Add the usetenantId
+        list.add(new Object[] { "", false, false, true, false, true});
         return TestUtil.filterTxParamData(list,0);
     }
 
@@ -230,6 +239,90 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testIndexToolWithTenantId() throws Exception {
+        if (!useTenantId) { return;}
+        String tenantId = generateUniqueName();
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String viewTenantName = generateUniqueName();
+        String indexNameGlobal = generateUniqueName();
+        String indexNameTenant = generateUniqueName();
+        String viewIndexTableName = "_IDX_" + dataTableName;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection connGlobal = DriverManager.getConnection(getUrl(), props);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        Connection connTenant = DriverManager.getConnection(getUrl(), props);
+        String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+                + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
+        String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
+
+        String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')";
+        String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
+
+        try {
+            String tableStmtGlobal = String.format(createTblStr, dataTableName);
+            connGlobal.createStatement().execute(tableStmtGlobal);
+
+            String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
+            connTenant.createStatement().execute(viewStmtTenant);
+
+            String idxStmtTenant = String.format(createIndexStr, indexNameTenant, viewTenantName);
+            connTenant.createStatement().execute(idxStmtTenant);
+
+            connTenant.createStatement()
+                    .execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x"));
+            connTenant.commit();
+
+            runIndexTool(true, false, "", viewTenantName, indexNameTenant,
+                    tenantId, 0, new String[0]);
+
+            String selectSql = String.format("SELECT ID FROM %s WHERE NAME='x'", viewTenantName);
+            ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(false, actualExplainPlan, "", viewIndexTableName);
+            rs = connTenant.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            // Remove from tenant view index and build.
+            ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
+            Admin admin = queryServices.getAdmin();
+            TableName tableName = TableName.valueOf(viewIndexTableName);
+            admin.disableTable(tableName);
+            admin.truncateTable(tableName, false);
+
+            runIndexTool(true, false, "", viewTenantName, indexNameTenant,
+                    tenantId, 0, new String[0]);
+            Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+            int count = getUtility().countRows(htable);
+            // Confirm index has rows
+            assertTrue(count == 1);
+
+            selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM %s",
+                    indexNameTenant, viewTenantName);
+            rs = connTenant.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            String idxStmtGlobal =
+                    String.format(createIndexStr, indexNameGlobal, dataTableName);
+            connGlobal.createStatement().execute(idxStmtGlobal);
+
+            // run the index MR job this time with tenant id.
+            // We expect it to return -1 because indexTable is not correct for this tenant.
+            runIndexTool(true, false, schemaName, dataTableName, indexNameGlobal,
+                    tenantId, -1, new String[0]);
+
+        } finally {
+            connGlobal.close();
+            connTenant.close();
+        }
+    }
+
+    @Test
     public void testSaltedVariableLengthPK() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
@@ -362,7 +455,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
-            String dataTable, String indxTable) {
+            String dataTable, String indxTable, String tenantId) {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -382,6 +475,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             args.add("-snap");
         }
 
+        if (tenantId != null) {
+            args.add("-tenant");
+            args.add(tenantId);
+        }
+
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
         return args.toArray(new String[0]);
@@ -402,15 +500,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
     public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
+        runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
+    }
+
+    public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+            String dataTableName, String indexTableName, String tenantId, int expectedStatus,
+            String... additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
         Configuration conf = new Configuration(getUtility().getConfiguration());
         conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         indexingTool.setConf(conf);
         final String[] cmdArgs =
-                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
+                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
-        assertEquals(0, status);
+        assertEquals(expectedStatus, status);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a4c82be..dc361c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -147,6 +147,8 @@ public class IndexTool extends Configured implements Tool {
             "Output path where the files are written");
     private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false,
         "If specified, uses Snapshots for async index building (optional)");
+    private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
+        "If specified, uses Tenant connection for tenant view index building (optional)");
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
 
@@ -160,6 +162,7 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);
+        options.addOption(TENANT_ID_OPTION);
         options.addOption(HELP_OPTION);
         AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
         options.addOption(AUTO_SPLIT_INDEX_OPTION);
@@ -245,15 +248,15 @@ public class IndexTool extends Configured implements Tool {
         }
 
         public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild,
-            boolean useSnapshot) throws Exception {
+            boolean useSnapshot, String tenantId) throws Exception {
             if (isPartialBuild) {
-                return configureJobForPartialBuild(schemaName, dataTable);
+                return configureJobForPartialBuild(schemaName, dataTable, tenantId);
             } else {
-                return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot);
+                return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId);
             }
         }
         
-        private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception {
+        private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception {
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
             connection = ConnectionUtil.getInputConnection(configuration);
@@ -301,7 +304,10 @@ public class IndexTool extends Configured implements Tool {
             ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
             IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class));
             PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
-            
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
+
             //Prepare raw scan 
             Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
             scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
@@ -362,7 +368,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot)
+        private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId)
                 throws Exception {
             final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final String qIndexTable;
@@ -406,6 +412,9 @@ public class IndexTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
             PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
                 indexColumns.toArray(new String[indexColumns.size()]));
+            if (tenantId != null) {
+                PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+            }
             final List<ColumnInfo> columnMetadataList =
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
@@ -532,14 +541,20 @@ public class IndexTool extends Configured implements Tool {
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
             boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
-            connection = ConnectionUtil.getInputConnection(configuration);
+            boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
             byte[][] splitKeysBeforeJob = null;
             boolean isLocalIndexBuild = false;
             PTable pindexTable = null;
+            String tenantId = null;
+            if (useTenantId) {
+                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+                configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            }
+            connection = ConnectionUtil.getInputConnection(configuration);
             if (indexTable != null) {
-                if (!isValidIndexTable(connection, qDataTable,indexTable)) {
+                if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
                     throw new IllegalArgumentException(String.format(
-                        " %s is not an index table for %s ", indexTable, qDataTable));
+                        " %s is not an index table for %s for this connection", indexTable, qDataTable));
                 }
                 pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
                         ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
@@ -574,7 +589,7 @@ public class IndexTool extends Configured implements Tool {
 			}
             
             Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable,
-                    useDirectApi, isPartialBuild, useSnapshot);
+                    useDirectApi, isPartialBuild, useSnapshot, tenantId);
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and exit");
                 job.submit();
@@ -733,14 +748,18 @@ public class IndexTool extends Configured implements Tool {
      * @throws SQLException
      */
     private boolean isValidIndexTable(final Connection connection, final String masterTable,
-            final String indexTable) throws SQLException {
+            final String indexTable, final String tenantId) throws SQLException {
         final DatabaseMetaData dbMetaData = connection.getMetaData();
         final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
         final String tableName = SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
 
         ResultSet rs = null;
         try {
-            rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false);
+            String catalog = "";
+            if (tenantId != null) {
+                catalog = tenantId;
+            }
+            rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName, false, false);
             while (rs.next()) {
                 final String indexName = rs.getString(6);
                 if (indexTable.equalsIgnoreCase(indexName)) {