You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2019/12/12 18:19:56 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5590 IndexTool
always runs with direct
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 5f6a625 PHOENIX-5590 IndexTool always runs with direct
5f6a625 is described below
commit 5f6a6257fac85f89538ecf3178cd2a5d144e2d36
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Tue Nov 26 14:31:06 2019 -0800
PHOENIX-5590 IndexTool always runs with direct
Signed-off-by: s.kadam <s....@salesforce.com>
---
.../end2end/IndexToolForPartialBuildIT.java | 1 +
.../org/apache/phoenix/end2end/IndexToolIT.java | 23 ++---
.../coprocessor/tasks/IndexRebuildTask.java | 2 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 97 +++++-----------------
4 files changed, 30 insertions(+), 93 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 50515b1..3027554 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -237,6 +237,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
// complete index rebuild
args.add("-it");
args.add(indexName);
+ args.add("-runfg");
args.add("-op");
args.add("/tmp/output/partialTable_");
return args.toArray(new String[0]);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 866565d..0cdfc39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,11 +38,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -53,12 +50,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
-import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
import org.apache.phoenix.query.QueryServices;
@@ -661,10 +656,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add(indxTable);
if (directApi) {
args.add("-direct");
- // Need to run this job in foreground for the test to be deterministic
- args.add("-runfg");
}
-
+ // Need to run this job in foreground for the test to be deterministic
+ args.add("-runfg");
if (useSnapshot) {
args.add("-snap");
}
@@ -705,15 +699,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
boolean transactional = dataTable.isTransactional();
boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
- if (directApi) {
- if ((localIndex || !transactional) && !useSnapshot) {
- assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
- } else {
- assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
- }
- }
- else {
- assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class);
+ if ((localIndex || !transactional) && !useSnapshot) {
+ assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
+ } else {
+ assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
}
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index 168605f..91d7e67 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -94,7 +94,7 @@ public class IndexRebuildTask extends BaseTask {
// Run index tool async.
boolean runForeground = false;
Map.Entry<Integer, Job> indexToolRes = IndexTool
- .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, true,
+ .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName,
false, taskRecord.getTenantId(), shouldDisable, rebuildAll, runForeground);
int status = indexToolRes.getKey();
if (status != 0) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 864036c..9a589b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -51,15 +51,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -124,7 +121,6 @@ public class IndexTool extends Configured implements Tool {
private boolean isPartialBuild;
private String qDataTable;
private String qIndexTable;
- private boolean useDirectApi;
private boolean useSnapshot;
private boolean isLocalIndexBuild;
private boolean shouldDeleteBeforeRebuild;
@@ -146,7 +142,7 @@ public class IndexTool extends Configured implements Tool {
"To build indexes for a data table from least disabledTimeStamp");
private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
- "If specified, we avoid the bulk load (optional)");
+ "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
@@ -232,27 +228,20 @@ public class IndexTool extends Configured implements Tool {
throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory "
+ "parameter");
}
-
- if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || cmdLine.hasOption(DIRECT_API_OPTION.getOpt()))
- && !cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
- throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + "parameter");
- }
if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
}
+ if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+ throw new IllegalStateException("Index name should be passed unless it is a partial rebuild.");
+ }
+
if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not compatible with "
+ PARTIAL_REBUILD_OPTION.getLongOpt());
}
-
- if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
- && cmdLine.hasOption(RUN_FOREGROUND_OPTION
- .getOpt())) {
- throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
- + " is applicable only for " + DIRECT_API_OPTION.getLongOpt());
- }
+
boolean splitIndex = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
throw new IllegalStateException("Must pass an index name for the split index option");
@@ -304,10 +293,10 @@ public class IndexTool extends Configured implements Tool {
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
}
- if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) {
+ if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) {
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
Long.toString(maxTimeRange));
- return configureJobForAysncIndex();
+ return configureJobForAsyncIndex();
} else {
// Local and non-transactional global indexes to be built on the server side
// It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure that
@@ -428,10 +417,7 @@ public class IndexTool extends Configured implements Tool {
}
- private Job configureJobForAysncIndex()
-
- throws Exception {
-
+ private Job configureJobForAsyncIndex() throws Exception {
String physicalIndexTable = pIndexTable.getPhysicalName().getString();
final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
final PostIndexDDLCompiler ddlCompiler =
@@ -455,13 +441,17 @@ public class IndexTool extends Configured implements Tool {
PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);
- fs = outputPath.getFileSystem(configuration);
- fs.delete(outputPath, true);
+ if (outputPath != null) {
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
+ }
final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
final Job job = Job.getInstance(configuration, jobName);
job.setJarByClass(IndexTool.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- FileOutputFormat.setOutputPath(job, outputPath);
+ if (outputPath != null) {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ }
if (!useSnapshot) {
PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery);
@@ -489,16 +479,11 @@ public class IndexTool extends Configured implements Tool {
}
TableMapReduceUtil.initCredentials(job);
- if (useDirectApi) {
- job.setMapperClass(PhoenixIndexImportDirectMapper.class);
- return configureSubmittableJobUsingDirectApi(job);
- } else {
- return configureRunnableJobUsingBulkLoad(job, outputPath);
- }
+ job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+ return configureSubmittableJobUsingDirectApi(job);
}
- private Job configureJobForServerBuildIndex()
- throws Exception {
+ private Job configureJobForServerBuildIndex() throws Exception {
long indexRebuildQueryTimeoutMs =
configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
@@ -554,26 +539,6 @@ public class IndexTool extends Configured implements Tool {
}
/**
- * Submits the job and waits for completion.
- * @param job
- * @param outputPath
- * @return
- * @throws Exception
- */
- private Job configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception {
- job.setMapperClass(PhoenixIndexImportMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- final Configuration configuration = job.getConfiguration();
- final String physicalIndexTable =
- PhoenixConfigurationUtil.getPhysicalTableName(configuration);
- try(final HTable htable = new HTable(configuration, physicalIndexTable)) {
- HFileOutputFormat.configureIncrementalLoad(job, htable);
- }
- return job;
- }
-
- /**
* Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
* waits for the job completion based on runForeground parameter.
*
@@ -581,9 +546,7 @@ public class IndexTool extends Configured implements Tool {
* @return
* @throws Exception
*/
- private Job configureSubmittableJobUsingDirectApi(Job job)
- throws Exception {
-
+ private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
job.setReducerClass(PhoenixIndexImportDirectReducer.class);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -633,7 +596,6 @@ public class IndexTool extends Configured implements Tool {
try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
}
- useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
@@ -697,7 +659,7 @@ public class IndexTool extends Configured implements Tool {
jobFactory = new JobFactory(connection, configuration, outputPath);
job = jobFactory.getJob();
- if (!isForeground && useDirectApi) {
+ if (!isForeground) {
LOGGER.info("Running Index Build in Background - Submit async and exit");
job.submit();
return 0;
@@ -707,18 +669,6 @@ public class IndexTool extends Configured implements Tool {
boolean result = job.waitForCompletion(true);
if (result) {
- if (!useDirectApi && indexTable != null) {
- if (isLocalIndexBuild) {
- validateSplitForLocalIndex(splitKeysBeforeJob, htable);
- }
- LOGGER.info("Loading HFiles from {}", outputPath);
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
- loader.doBulkLoad(outputPath, htable);
- htable.close();
- // Without direct API, we need to update the index state to ACTIVE from client.
- IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE);
- fs.delete(outputPath, true);
- }
return 0;
} else {
LOGGER.error("IndexTool job failed! Check logs for errors..");
@@ -919,7 +869,7 @@ public class IndexTool extends Configured implements Tool {
}
public static Map.Entry<Integer, Job> run(Configuration conf, String schemaName, String dataTable, String indexTable,
- boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
+ boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
final List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -929,9 +879,6 @@ public class IndexTool extends Configured implements Tool {
args.add(dataTable);
args.add("-it");
args.add(indexTable);
- if (directApi) {
- args.add("-direct");
- }
if (runForeground) {
args.add("-runfg");