You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2019/12/12 18:19:56 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5590 IndexTool always runs with direct

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 5f6a625  PHOENIX-5590 IndexTool always runs with direct
5f6a625 is described below

commit 5f6a6257fac85f89538ecf3178cd2a5d144e2d36
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Tue Nov 26 14:31:06 2019 -0800

    PHOENIX-5590 IndexTool always runs with direct
    
    Signed-off-by: s.kadam <s....@salesforce.com>
---
 .../end2end/IndexToolForPartialBuildIT.java        |  1 +
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 23 ++---
 .../coprocessor/tasks/IndexRebuildTask.java        |  2 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 97 +++++-----------------
 4 files changed, 30 insertions(+), 93 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 50515b1..3027554 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -237,6 +237,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         // complete index rebuild
         args.add("-it");
         args.add(indexName);
+        args.add("-runfg");
         args.add("-op");
         args.add("/tmp/output/partialTable_");
         return args.toArray(new String[0]);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 866565d..0cdfc39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,11 +38,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -53,12 +50,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
-import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
 import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
 
 import org.apache.phoenix.query.QueryServices;
@@ -661,10 +656,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         args.add(indxTable);
         if (directApi) {
             args.add("-direct");
-            // Need to run this job in foreground for the test to be deterministic
-            args.add("-runfg");
         }
-
+        // Need to run this job in foreground for the test to be deterministic
+        args.add("-runfg");
         if (useSnapshot) {
             args.add("-snap");
         }
@@ -705,15 +699,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             boolean transactional = dataTable.isTransactional();
             boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
 
-            if (directApi) {
-                if ((localIndex || !transactional) && !useSnapshot) {
-                    assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
-                } else {
-                    assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
-                }
-            }
-            else {
-                assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class);
+            if ((localIndex || !transactional) && !useSnapshot) {
+                assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
+            } else {
+                assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index 168605f..91d7e67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -94,7 +94,7 @@ public class IndexRebuildTask extends BaseTask  {
             // Run index tool async.
             boolean runForeground = false;
             Map.Entry<Integer, Job> indexToolRes = IndexTool
-                    .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, true,
+                    .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName,
                             false, taskRecord.getTenantId(), shouldDisable, rebuildAll, runForeground);
             int status = indexToolRes.getKey();
             if (status != 0) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 864036c..9a589b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -51,15 +51,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -124,7 +121,6 @@ public class IndexTool extends Configured implements Tool {
     private boolean isPartialBuild;
     private String qDataTable;
     private String qIndexTable;
-    private boolean useDirectApi;
     private boolean useSnapshot;
     private boolean isLocalIndexBuild;
     private boolean shouldDeleteBeforeRebuild;
@@ -146,7 +142,7 @@ public class IndexTool extends Configured implements Tool {
             "To build indexes for a data table from least disabledTimeStamp");
     
     private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
-            "If specified, we avoid the bulk load (optional)");
+            "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
 
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
@@ -232,27 +228,20 @@ public class IndexTool extends Configured implements Tool {
             throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory "
                     + "parameter");
         }
-
-		if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || cmdLine.hasOption(DIRECT_API_OPTION.getOpt()))
-				&& !cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
-			throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + "parameter");
-		}
         
 		if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
 			throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
 		}
 
+        if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException("Index name should be passed unless it is a partial rebuild.");
+        }
+
 		if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
             throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not compatible with "
                     + PARTIAL_REBUILD_OPTION.getLongOpt());
         }
-        		
-        if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
-                && cmdLine.hasOption(RUN_FOREGROUND_OPTION
-                        .getOpt())) {
-            throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
-                    + " is applicable only for " + DIRECT_API_OPTION.getLongOpt());
-        }
+
         boolean splitIndex = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
         if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
             throw new IllegalStateException("Must pass an index name for the split index option");
@@ -304,10 +293,10 @@ public class IndexTool extends Configured implements Tool {
                             Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
                     configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
                 }
-                if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) {
+                if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) {
                     configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
                             Long.toString(maxTimeRange));
-                    return configureJobForAysncIndex();
+                    return configureJobForAsyncIndex();
                 } else {
                     // Local and non-transactional global indexes to be built on the server side
                     // It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure that
@@ -428,10 +417,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAysncIndex()
-
-                throws Exception {
-
+        private Job configureJobForAsyncIndex() throws Exception {
             String physicalIndexTable = pIndexTable.getPhysicalName().getString();
             final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
             final PostIndexDDLCompiler ddlCompiler =
@@ -455,13 +441,17 @@ public class IndexTool extends Configured implements Tool {
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
 
-            fs = outputPath.getFileSystem(configuration);
-            fs.delete(outputPath, true);
+            if (outputPath != null) {
+                fs = outputPath.getFileSystem(configuration);
+                fs.delete(outputPath, true);
+            }
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            FileOutputFormat.setOutputPath(job, outputPath);
+            if (outputPath != null) {
+                FileOutputFormat.setOutputPath(job, outputPath);
+            }
 
             if (!useSnapshot) {
                 PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery);
@@ -489,16 +479,11 @@ public class IndexTool extends Configured implements Tool {
             }
             TableMapReduceUtil.initCredentials(job);
             
-            if (useDirectApi) {
-                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
-                return configureSubmittableJobUsingDirectApi(job);
-            } else {
-                return configureRunnableJobUsingBulkLoad(job, outputPath);
-            }
+            job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+            return configureSubmittableJobUsingDirectApi(job);
         }
 
-        private Job configureJobForServerBuildIndex()
-                throws Exception {
+        private Job configureJobForServerBuildIndex() throws Exception {
             long indexRebuildQueryTimeoutMs =
                     configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
                             QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
@@ -554,26 +539,6 @@ public class IndexTool extends Configured implements Tool {
         }
 
         /**
-         * Submits the job and waits for completion.
-         * @param job
-         * @param outputPath
-         * @return
-         * @throws Exception
-         */
-        private Job configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception {
-            job.setMapperClass(PhoenixIndexImportMapper.class);
-            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            job.setMapOutputValueClass(KeyValue.class);
-            final Configuration configuration = job.getConfiguration();
-            final String physicalIndexTable =
-                    PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-            try(final HTable htable = new HTable(configuration, physicalIndexTable)) {
-                HFileOutputFormat.configureIncrementalLoad(job, htable);
-            }
-            return job;
-        }
-        
-        /**
          * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
          * waits for the job completion based on runForeground parameter.
          * 
@@ -581,9 +546,7 @@ public class IndexTool extends Configured implements Tool {
          * @return
          * @throws Exception
          */
-        private Job configureSubmittableJobUsingDirectApi(Job job)
-                throws Exception {
-
+        private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
             job.setReducerClass(PhoenixIndexImportDirectReducer.class);
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -633,7 +596,6 @@ public class IndexTool extends Configured implements Tool {
             try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
                 pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
             }
-            useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
             useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
@@ -697,7 +659,7 @@ public class IndexTool extends Configured implements Tool {
             jobFactory = new JobFactory(connection, configuration, outputPath);
             job = jobFactory.getJob();
 
-            if (!isForeground && useDirectApi) {
+            if (!isForeground) {
                 LOGGER.info("Running Index Build in Background - Submit async and exit");
                 job.submit();
                 return 0;
@@ -707,18 +669,6 @@ public class IndexTool extends Configured implements Tool {
             boolean result = job.waitForCompletion(true);
             
             if (result) {
-                if (!useDirectApi && indexTable != null) {
-                    if (isLocalIndexBuild) {
-                        validateSplitForLocalIndex(splitKeysBeforeJob, htable);
-                    }
-                    LOGGER.info("Loading HFiles from {}", outputPath);
-                    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
-                    loader.doBulkLoad(outputPath, htable);
-                    htable.close();
-                    // Without direct API, we need to update the index state to ACTIVE from client.
-                    IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE);
-                    fs.delete(outputPath, true);
-                }
                 return 0;
             } else {
                 LOGGER.error("IndexTool job failed! Check logs for errors..");
@@ -919,7 +869,7 @@ public class IndexTool extends Configured implements Tool {
     }
 
     public static Map.Entry<Integer, Job> run(Configuration conf, String schemaName, String dataTable, String indexTable,
-            boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
+             boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -929,9 +879,6 @@ public class IndexTool extends Configured implements Tool {
         args.add(dataTable);
         args.add("-it");
         args.add(indexTable);
-        if (directApi) {
-            args.add("-direct");
-        }
 
         if (runForeground) {
             args.add("-runfg");