You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/03/24 02:47:02 UTC
[phoenix] branch master updated: PHOENIX-5732: Implement endtime in
IndexTool for rebuild and verification
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 9379540 PHOENIX-5732: Implement endtime in IndexTool for rebuild and verification
9379540 is described below
commit 9379540a5386721dfb08b0f7087ade18949ebf29
Author: s.kadam <s....@apache.org>
AuthorDate: Thu Mar 19 13:28:33 2020 -0700
PHOENIX-5732: Implement endtime in IndexTool for rebuild and verification
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 24 +-
.../phoenix/end2end/IndexToolTimeRangeIT.java | 218 ++++++++++
.../PhoenixServerBuildIndexInputFormat.java | 21 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 470 +++++++++++----------
.../mapreduce/util/PhoenixConfigurationUtil.java | 24 ++
.../org/apache/phoenix/index/IndexToolTest.java | 203 +++++++++
.../util/PhoenixConfigurationUtilTest.java | 15 +
7 files changed, 743 insertions(+), 232 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 5cf9da6..c123741 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -962,8 +962,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
- String dataTable, String indxTable, String tenantId,
- IndexTool.IndexVerifyType verifyType) {
+ String dataTable, String indxTable, String tenantId,
+ IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -988,7 +988,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add("-tenant");
args.add(tenantId);
}
-
+ if(startTime != null) {
+ args.add("-st");
+ args.add(String.valueOf(startTime));
+ }
+ if(endTime != null) {
+ args.add("-et");
+ args.add(String.valueOf(endTime));
+ }
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
@@ -996,7 +1003,16 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
- List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+ tenantId, verifyType, null, null);
+ return args.toArray(new String[0]);
+ }
+
+ public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTable, String indexTable, String tenantId,
+ IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+ tenantId, verifyType, startTime, endTime);
return args.toArray(new String[0]);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
new file mode 100644
index 0000000..09446c6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
+ private static final String
+ CREATE_TABLE_DDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, "
+ + "VAL1 INTEGER, VAL2 INTEGER) COLUMN_ENCODED_BYTES=0";
+ public static final String CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)";
+ private static final String UPSERT_TABLE_DML = "UPSERT INTO %s VALUES(?,?,?)";
+
+ private static String dataTableFullName, indexTableFullName,
+ schemaName, dataTableName, indexTableName;
+ static CustomEnvironmentEdge customEdge = new CustomEnvironmentEdge();
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ setupMiniCluster();
+ createTableAndIndex();
+ populateDataTable();
+ }
+
+ private static void createTableAndIndex() throws SQLException {
+ schemaName = generateUniqueName();
+ dataTableName = "D_"+generateUniqueName();
+ dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ indexTableName = "I_"+generateUniqueName();
+ indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ try (Connection conn = getConnection()) {
+ incrementEdgeByOne();
+ conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+ conn.commit();
+ incrementEdgeByOne();
+ conn.createStatement().execute(String.format(CREATE_INDEX_DDL, indexTableName,
+ dataTableFullName));
+ conn.commit();
+ }
+ }
+
+ private static void incrementEdgeByOne() {
+ customEdge.incrementValue(1);
+ EnvironmentEdgeManager.injectEdge(customEdge);
+ }
+
+ private static void populateDataTable() throws SQLException {
+ try (Connection conn = getConnection()) {
+ //row 1-> time 4, row 2-> time 5, row 3-> time 6, row 4-> time 7, row 5-> time 8
+ for (int i=0; i<5; i++) {
+ incrementEdgeByOne();
+ PreparedStatement ps = conn.prepareStatement(
+ String.format(UPSERT_TABLE_DML, dataTableFullName));
+ ps.setInt(1, i+1);
+ ps.setInt(2,(i+1)*10);
+ ps.setInt(3, (i+1)*100);
+ ps.execute();
+ conn.commit();
+ }
+ }
+ }
+
+ private static void setupMiniCluster() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+ Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+ clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ private int countRowsInIndex() throws SQLException {
+ incrementEdgeByOne();
+ String select = "SELECT COUNT(*) FROM "+indexTableFullName;
+ try(Connection conn = getConnection()) {
+ ResultSet rs = conn.createStatement().executeQuery(select);
+ while(rs.next()) {
+ return rs.getInt(1);
+ }
+ }
+ return -1;
+ }
+
+ private static Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(getUrl(),
+ PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ }
+
+ @Before
+ public void beforeTest() {
+ incrementEdgeByOne();
+ }
+
+ @Test
+ public void testValidTimeRange() throws Exception {
+ String [] args = {"--deleteall", "--starttime", "1" , "--endtime", "9"};
+ runIndexTool(args, 0);
+ // all rows should be rebuilt
+ Assert.assertEquals(5, countRowsInIndex());
+ }
+
+ @Ignore("Until PHOENIX-5783 is fixed")
+ @Test
+ public void testValidTimeRange_startTimeInBetween() throws Exception {
+ String [] args = {"--deleteall", "--starttime", "6" , "--endtime", "9"};
+ runIndexTool(args, 0);
+ // only last 3 rows should be rebuilt
+ Assert.assertEquals(3, countRowsInIndex());
+ }
+
+ @Test
+ public void testValidTimeRange_endTimeInBetween() throws Exception {
+ String [] args = {"--deleteall", "--starttime", "1" , "--endtime", "6"};
+ runIndexTool(args, 0);
+ // only first 2 should be rebuilt
+ Assert.assertEquals(2, countRowsInIndex());
+ }
+
+ @Test
+ public void testNoTimeRangePassed() throws Exception {
+ String [] args = {"--deleteall"};
+ runIndexTool(args, 0);
+ // all rows should be rebuilt
+ Assert.assertEquals(5, countRowsInIndex());
+ }
+
+ @Ignore("Until PHOENIX-5783 is fixed")
+ @Test
+ public void testValidTimeRange_onlyStartTimePassed() throws Exception {
+ //starttime passed of last upsert
+ String [] args = {"--deleteall", "--starttime", "8"};
+ runIndexTool(args, 0);
+ Assert.assertEquals(1, countRowsInIndex());
+ }
+
+ @Test
+ public void testValidTimeRange_onlyEndTimePassed() throws Exception {
+ //end time passed as time of second upsert
+ String [] args = {"--deleteall", "--endtime", "5"};
+ runIndexTool(args, 0);
+ Assert.assertEquals(1, countRowsInIndex());
+ }
+
+ private void runIndexTool(String [] args, int expectedStatus) throws Exception {
+ IndexToolIT.runIndexTool(true, false, schemaName, dataTableName,
+ indexTableName, null, expectedStatus,
+ IndexTool.IndexVerifyType.NONE, args);
+ }
+
+ private static class CustomEnvironmentEdge extends EnvironmentEdge {
+ protected long value = 1L;
+
+ public void setValue(long newValue) {
+ value = newValue;
+ }
+
+ public void incrementValue(long addedValue) {
+ value += addedValue;
+ }
+
+ @Override
+ public long currentTime() {
+ return this.value;
+ }
+ }
+
+ @AfterClass
+ public static void teardown() {
+ tearDownMiniClusterAsync(2);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index d086fed..66d9313 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -39,8 +39,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getCurrentScnValue;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
/**
@@ -60,20 +64,22 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
}
@Override
- protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+ protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
throws IOException {
Preconditions.checkNotNull(context);
if (queryPlan != null) {
return queryPlan;
}
final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
- final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final String currentScnValue = getCurrentScnValue(configuration);
final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ //until PHOENIX-5783 is fixed; we'll continue with startTime = 0
+ final String startTimeValue = null;
final Properties overridingProps = new Properties();
- if(txnScnValue==null && currentScnValue!=null) {
+ if (txnScnValue==null && currentScnValue!=null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
}
- if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+ if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null) {
overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
String dataTableFullName = getIndexToolDataTableName(configuration);
@@ -82,8 +88,9 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
- configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
- Long.toString(scn));
+ setCurrentScnValue(configuration, scn);
+
+ Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue);
PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler =
new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
@@ -91,7 +98,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
Scan scan = plan.getContext().getScan();
try {
- scan.setTimeRange(0, scn);
+ scan.setTimeRange(startTime, scn);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE,
PhoenixConfigurationUtil.getIndexVerifyType(configuration).toBytes());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index c71c0ff..08e1564 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -89,16 +90,19 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.HConnectionFactory;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.EquiDepthStreamHistogram;
import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket;
import org.apache.phoenix.util.IndexUtil;
@@ -126,7 +130,7 @@ public class IndexTool extends Configured implements Tool {
private String value;
private byte[] valueBytes;
- private IndexVerifyType(String value) {
+ IndexVerifyType(String value) {
this.value = value;
this.valueBytes = PVarchar.INSTANCE.toBytes(value);
}
@@ -201,19 +205,21 @@ public class IndexTool extends Configured implements Tool {
private String schemaName;
private String dataTable;
private String indexTable;
- private boolean isPartialBuild;
+ private boolean isPartialBuild, isForeground;
private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
- private boolean isLocalIndexBuild;
+ private boolean isLocalIndexBuild = false;
private boolean shouldDeleteBeforeRebuild;
- private PTable pIndexTable;
+ private PTable pIndexTable = null;
private PTable pDataTable;
- private String tenantId;
+ private String tenantId = null;
private Job job;
-
-
+ private Long startTime, endTime;
+ private IndexType indexType;
+ private String basePath;
+ byte[][] splitKeysBeforeJob = null;
private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
"Phoenix schema name (optional)");
@@ -272,8 +278,20 @@ public class IndexTool extends Configured implements Tool {
+ "If specified, truncates the index table and rebuilds (optional)");
private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+ private static final Option START_TIME_OPTION = new Option("st", "starttime",
+ true, "Start time for indextool rebuild or verify");
+ private static final Option END_TIME_OPTION = new Option("et", "endtime",
+ true, "End time for indextool rebuild or verify");
+
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
+ public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
+ + "or equal to endTime "
+ + "or either of them are set in the future; IndexTool can't proceed.";
+
+ public static final String FEATURE_NOT_APPLICABLE = "starttime-endtime feature is only "
+ + "applicable for local or non-transactional global indexes";
+
private Options getOptions() {
final Options options = new Options();
options.addOption(SCHEMA_NAME_OPTION);
@@ -289,9 +307,13 @@ public class IndexTool extends Configured implements Tool {
options.addOption(DELETE_ALL_AND_REBUILD_OPTION);
options.addOption(HELP_OPTION);
AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
- options.addOption(AUTO_SPLIT_INDEX_OPTION);
SPLIT_INDEX_OPTION.setOptionalArg(true);
+ START_TIME_OPTION.setOptionalArg(true);
+ END_TIME_OPTION.setOptionalArg(true);
+ options.addOption(AUTO_SPLIT_INDEX_OPTION);
options.addOption(SPLIT_INDEX_OPTION);
+ options.addOption(START_TIME_OPTION);
+ options.addOption(END_TIME_OPTION);
return options;
}
@@ -301,7 +323,8 @@ public class IndexTool extends Configured implements Tool {
* @param args supplied command line arguments
* @return the parsed command line
*/
- private CommandLine parseOptions(String[] args) {
+ @VisibleForTesting
+ public CommandLine parseOptions(String[] args) {
final Options options = getOptions();
@@ -322,15 +345,18 @@ public class IndexTool extends Configured implements Tool {
+ "parameter");
}
- if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
- throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
- }
+ if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
+ && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+ throw new IllegalStateException("Index name should not be passed with "
+ + PARTIAL_REBUILD_OPTION.getLongOpt());
+ }
- if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+ if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
+ && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
throw new IllegalStateException("Index name should be passed unless it is a partial rebuild.");
}
- if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
+ if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not compatible with "
+ PARTIAL_REBUILD_OPTION.getLongOpt());
}
@@ -356,6 +382,14 @@ public class IndexTool extends Configured implements Tool {
System.exit(exitCode);
}
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
class JobFactory {
Connection connection;
Configuration configuration;
@@ -366,13 +400,6 @@ public class IndexTool extends Configured implements Tool {
this.connection = connection;
this.configuration = configuration;
this.outputPath = outputPath;
-
- }
-
- void closeConnection() throws SQLException {
- if (this.connection != null) {
- this.connection.close();
- }
}
public Job getJob() throws Exception {
@@ -387,14 +414,19 @@ public class IndexTool extends Configured implements Tool {
configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
}
if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) {
- configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
- Long.toString(maxTimeRange));
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, maxTimeRange);
return configureJobForAsyncIndex();
} else {
// Local and non-transactional global indexes to be built on the server side
// It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure that
// all the rows that exist so far will be rebuilt. The current time of the servers will
// be used to set the time range for server side scans.
+
+ // However, PHOENIX-5732 introduces endTime parameter to be passed optionally for IndexTool.
+ // When endTime is passed for local and non-tx global indexes, we'll override the CURRENT_SCN_VALUE.
+ if (endTime != null) {
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ }
return configureJobForServerBuildIndex();
}
}
@@ -472,9 +504,9 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setDisableIndexes(configuration, StringUtils.join(",",disableIndexes));
final Job job = Job.getInstance(configuration, jobName);
- if (outputPath != null) {
- FileOutputFormat.setOutputPath(job, outputPath);
- }
+ if (outputPath != null) {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ }
job.setJarByClass(IndexTool.class);
TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, PhoenixIndexPartialBuildMapper.class, null,
null, job);
@@ -491,14 +523,15 @@ public class IndexTool extends Configured implements Tool {
for (String index : disableIndexes) {
quotedIndexes.add("'" + index + "'");
}
- ResultSet rs = connection.createStatement()
+ try (ResultSet rs = connection.createStatement()
.executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
+ ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM
+ (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL")
- + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")");
- if (rs.next()) {
- maxRebuilAsyncDate = rs.getLong(1);
- maxDisabledTimeStamp = rs.getLong(2);
+ + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")")) {
+ if (rs.next()) {
+ maxRebuilAsyncDate = rs.getLong(1);
+ maxDisabledTimeStamp = rs.getLong(2);
+ }
}
// Do check if table is disabled again after user invoked async rebuilding during the run of the job
if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
@@ -507,7 +540,6 @@ public class IndexTool extends Configured implements Tool {
throw new RuntimeException(
"Inconsistent state we have one or more index tables which are disabled after the async is called!!");
}
-
}
private Job configureJobForAsyncIndex() throws Exception {
@@ -602,6 +634,9 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+ if (startTime != null) {
+ PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+ }
PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
String physicalIndexTable = pIndexTable.getPhysicalName().getString();
@@ -631,17 +666,6 @@ public class IndexTool extends Configured implements Tool {
return configureSubmittableJobUsingDirectApi(job);
}
- /**
- * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
- * waits for the job completion based on runForeground parameter.
- *
- * @param job
- * @param outputPath
- * @param runForeground - if true, waits for job completion, else submits and returns
- * immediately.
- * @return
- * @throws Exception
- */
private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
job.setReducerClass(PhoenixIndexImportDirectReducer.class);
Configuration conf = job.getConfiguration();
@@ -688,118 +712,26 @@ public class IndexTool extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- Connection connection = null;
- Table htable = null;
- RegionLocator regionLocator = null;
- JobFactory jobFactory = null;
- org.apache.hadoop.hbase.client.Connection hConn = null;
+ CommandLine cmdLine;
try {
- CommandLine cmdLine = null;
- try {
- cmdLine = parseOptions(args);
- } catch (IllegalStateException e) {
- printHelpAndExit(e.getMessage(), getOptions());
- }
- final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
- boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
- tenantId = null;
- if (useTenantId) {
- tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
- configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
- }
-
- schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
- dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
- indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
- isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
- if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
- String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
- indexVerifyType = IndexVerifyType.fromValue(value);
- }
- qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
- try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
- pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
- }
- String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
- boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
- useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
- shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
-
- byte[][] splitKeysBeforeJob = null;
- isLocalIndexBuild = false;
- pIndexTable = null;
-
- connection = ConnectionUtil.getInputConnection(configuration);
- createIndexToolTables(connection);
-
- if (indexTable != null) {
- if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
- throw new IllegalArgumentException(String.format(
- " %s is not an index table for %s for this connection", indexTable, qDataTable));
- }
- pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
- ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
- if (schemaName != null && !schemaName.isEmpty()) {
- qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
- } else {
- qIndexTable = indexTable;
- }
- htable = connection.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(pIndexTable.getPhysicalName().getBytes());
- hConn = ConnectionFactory.createConnection(configuration);
- regionLocator = hConn.getRegionLocator(
- TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
- if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
- isLocalIndexBuild = true;
- splitKeysBeforeJob = regionLocator.getStartKeys();
- }
-
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ return -1;
+ }
+ populateIndexToolAttributes(cmdLine);
+ Configuration configuration = getConfiguration(tenantId);
+ try (Connection conn = getConnection(configuration)) {
+ createIndexToolTables(conn);
+ if (dataTable != null && indexTable != null) {
+ setupIndexAndDataTable(conn);
+ checkTimeRangeFeature(startTime, endTime, pDataTable, isLocalIndexBuild);
if (shouldDeleteBeforeRebuild) {
- deleteBeforeRebuild(connection);
- }
-
- // presplit the index table
- boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
- boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
- if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType())
- && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
- String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
- int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
- String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
- double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
- LOGGER.info(String.format("Will split index %s , autosplit=%s ," +
- " autoSplitNumRegions=%s , samplingRate=%s", indexTable,
- autosplit, autosplitNumRegions, samplingRate));
- splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration);
+ deleteBeforeRebuild(conn);
}
}
- Path outputPath = null;
- FileSystem fs = null;
- if (basePath != null) {
- outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null
- ? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString());
- fs = outputPath.getFileSystem(configuration);
- fs.delete(outputPath, true);
- }
-
- // We have to mark Disable index to Building before we can set it to Active in the reducer. Otherwise it errors out with
- // index state transition error
- if (pIndexTable != null && pIndexTable.getIndexState() == PIndexState.DISABLE) {
- IndexUtil.updateIndexState(connection.unwrap(PhoenixConnection.class),
- pIndexTable.getName().getString(), PIndexState.BUILDING, null);
- }
- jobFactory = new JobFactory(connection, configuration, outputPath);
- job = jobFactory.getJob();
-
- if (!isForeground) {
- LOGGER.info("Running Index Build in Background - Submit async and exit");
- job.submit();
- return 0;
- }
- LOGGER.info("Running Index Build in Foreground. Waits for the build to complete." +
- " This may take a long time!.");
- boolean result = job.waitForCompletion(true);
-
+ preSplitIndexTable(cmdLine, conn);
+ boolean result = submitIndexToolJob(conn, configuration);
if (result) {
return 0;
} else {
@@ -808,64 +740,168 @@ public class IndexTool extends Configured implements Tool {
}
} catch (Exception ex) {
LOGGER.error("An exception occurred while performing the indexing job: "
- + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
+ + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
return -1;
- } finally {
- boolean rethrowException = false;
- try {
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- LOGGER.error("Failed to close connection ", e);
- rethrowException = true;
- }
- }
- if (htable != null) {
- try {
- htable.close();
- } catch (IOException e) {
- LOGGER.error("Failed to close htable ", e);
- rethrowException = true;
- }
- }
- if (hConn != null) {
- try {
- hConn.close();
- } catch (IOException e) {
- LOGGER.error("Failed to close hconnection ", e);
- rethrowException = true;
- }
- }
- if (regionLocator != null) {
- try {
- regionLocator.close();
- } catch (IOException e) {
- LOGGER.error("Failed to close regionLocator ", e);
- rethrowException = true;
- }
- }
- if (jobFactory != null) {
- try {
- jobFactory.closeConnection();
- } catch (SQLException e) {
- LOGGER.error("Failed to close jobFactory ", e);
- rethrowException = true;
- }
- }
- } finally {
- if (rethrowException) {
- throw new RuntimeException("Failed to close resource");
- }
+ }
+ }
+
+ public static void checkTimeRangeFeature(Long startTime, Long endTime, PTable pDataTable, boolean isLocalIndexBuild) {
+ if (isTimeRangeSet(startTime, endTime) && !isTimeRangeFeatureApplicable(pDataTable, isLocalIndexBuild)) {
+ throw new RuntimeException(FEATURE_NOT_APPLICABLE);
+ }
+ }
+
+ private Configuration getConfiguration(String tenantId) {
+ final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+ if (tenantId != null) {
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ return configuration;
+ }
+
+ private boolean submitIndexToolJob(Connection conn, Configuration configuration)
+ throws Exception {
+ Path outputPath = null;
+ FileSystem fs;
+ if (basePath != null) {
+ outputPath =
+ CsvBulkImportUtil.getOutputPath(new Path(basePath),
+ pIndexTable == null ?
+ pDataTable.getPhysicalName().getString() :
+ pIndexTable.getPhysicalName().getString());
+ fs = outputPath.getFileSystem(configuration);
+ fs.delete(outputPath, true);
+ }
+ JobFactory jobFactory = new JobFactory(conn, configuration, outputPath);
+ job = jobFactory.getJob();
+ if (!isForeground) {
+ LOGGER.info("Running Index Build in Background - Submit async and exit");
+ job.submit();
+ return true;
+ }
+ LOGGER.info("Running Index Build in Foreground. Waits for the build to complete."
+ + " This may take a long time!.");
+ return job.waitForCompletion(true);
+ }
+
+ @VisibleForTesting
+ public void populateIndexToolAttributes(CommandLine cmdLine) {
+ boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
+ boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+ boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+ boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
+
+ if (useTenantId) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ }
+ if(useStartTime) {
+ startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+ }
+ if (useEndTime) {
+ endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+ }
+ if(isTimeRangeSet(startTime, endTime)) {
+ validateTimeRange();
+ }
+ if (verify) {
+ String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+ indexVerifyType = IndexVerifyType.fromValue(value);
+ }
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+ qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+ shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
+ }
+
+ private void validateTimeRange() {
+ Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ Long st = (startTime == null) ? 0 : startTime;
+ Long et = (endTime == null) ? currentTime : endTime;
+ if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
+ throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ }
+ }
+
+ private Connection getConnection(Configuration configuration) throws SQLException {
+ return ConnectionUtil.getInputConnection(configuration);
+ }
+
+ private void setupIndexAndDataTable(Connection connection) throws SQLException, IOException {
+ pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+ if (!isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
+ throw new IllegalArgumentException(
+ String.format(" %s is not an index table for %s for this connection",
+ indexTable, qDataTable));
+ }
+ pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
+ ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
+ indexType = pIndexTable.getIndexType();
+ if (schemaName != null && !schemaName.isEmpty()) {
+ qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
+ } else {
+ qIndexTable = indexTable;
+ }
+ if (IndexType.LOCAL.equals(indexType)) {
+ isLocalIndexBuild = true;
+ try (org.apache.hadoop.hbase.client.Connection hConn
+ = getTemporaryHConnection(connection.unwrap(PhoenixConnection.class))) {
+ RegionLocator regionLocator = hConn
+ .getRegionLocator(TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
+ splitKeysBeforeJob = regionLocator.getStartKeys();
}
}
+ // We have to mark Disable index to Building before we can set it to Active in the reducer. Otherwise it errors out with
+ // index state transition error
+ changeDisabledIndexStateToBuiding(connection);
+ }
+
+ private static boolean isTimeRangeSet(Long startTime, Long endTime) {
+ return startTime != null || endTime != null;
+ }
+
+ private static boolean isTimeRangeFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) {
+ if (isLocalIndexBuild || !dataTable.isTransactional()) {
+ return true;
+ }
+ return false;
+ }
+
+ private void changeDisabledIndexStateToBuiding(Connection connection) throws SQLException {
+ if (pIndexTable != null && pIndexTable.getIndexState() == PIndexState.DISABLE) {
+ IndexUtil.updateIndexState(connection.unwrap(PhoenixConnection.class),
+ pIndexTable.getName().getString(), PIndexState.BUILDING, null);
+ }
+ }
+
+ private void preSplitIndexTable(CommandLine cmdLine, Connection connection)
+ throws SQLException, IOException {
+ boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
+ boolean splitIndex = cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
+ boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
+ if (!isSalted && IndexType.GLOBAL.equals(indexType) && (autosplit || splitIndex)) {
+ String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
+ int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
+ String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
+ double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
+ LOGGER.info(String.format("Will split index %s , autosplit=%s ,"
+ + " autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit,
+ autosplitNumRegions, samplingRate));
+
+ splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit,
+ autosplitNumRegions, samplingRate);
+ }
}
private void deleteBeforeRebuild(Connection conn) throws SQLException, IOException {
if (MetaDataUtil.isViewIndex(pIndexTable.getPhysicalName().getString())) {
throw new IllegalArgumentException(String.format(
- "%s is a view index. delete-all-and-rebuild is not supported for view indexes",
- indexTable));
+ "%s is a view index. delete-all-and-rebuild is not supported for view indexes",
+ indexTable));
}
if (isLocalIndexBuild) {
@@ -881,16 +917,16 @@ public class IndexTool extends Configured implements Tool {
}
}
- private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
- throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+ private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit,
+ int autosplitNumRegions, double samplingRate)
+ throws SQLException, IOException, IllegalArgumentException {
int numRegions;
- try (org.apache.hadoop.hbase.client.Connection tempHConn =
- ConnectionFactory.createConnection(configuration);
+ try (org.apache.hadoop.hbase.client.Connection tempHConn = getTemporaryHConnection(pConnection);
RegionLocator regionLocator =
tempHConn.getRegionLocator(TableName.valueOf(qDataTable))) {
numRegions = regionLocator.getStartKeys().length;
- if (autosplit && !(numRegions > autosplitNumRegions)) {
+ if (autosplit && (numRegions <= autosplitNumRegions)) {
LOGGER.info(String.format(
"Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s",
pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions));
@@ -899,7 +935,7 @@ public class IndexTool extends Configured implements Tool {
}
// build a tablesample query to fetch index column values from the data table
DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable);
- String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate);
+ String qTableSample = String.format("%s TABLESAMPLE(%.2f)", qDataTable, samplingRate);
List<String> dataColNames = colNames.getDataColNames();
final String dataSampleQuery =
QueryUtil.constructSelectStatement(qTableSample, dataColNames, null,
@@ -936,6 +972,11 @@ public class IndexTool extends Configured implements Tool {
}
}
+ private org.apache.hadoop.hbase.client.Connection getTemporaryHConnection(PhoenixConnection pConnection)
+ throws SQLException, IOException {
+ return ConnectionFactory.createConnection(pConnection.getQueryServices().getAdmin().getConfiguration());
+ }
+
// setup a ValueGetter to get index values from the ResultSet
private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
// map from data col name to index in ResultSet
@@ -944,7 +985,7 @@ public class IndexTool extends Configured implements Tool {
for (String dataCol : dataColNames) {
rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++);
}
- ValueGetter getter = new ValueGetter() {
+ return new ValueGetter() {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable();
@@ -968,19 +1009,6 @@ public class IndexTool extends Configured implements Tool {
return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
}
};
- return getter;
- }
-
- private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, RegionLocator regionLocator) throws Exception {
- if (splitKeysBeforeJob != null
- && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
- String errMsg = "The index to build is local index and the split keys are not matching"
- + " before and after running the job. Please rerun the job otherwise"
- + " there may be inconsistencies between actual data and index data";
- LOGGER.error(errMsg);
- throw new Exception(errMsg);
- }
- return true;
}
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 196c842..6331bf8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -163,6 +163,8 @@ public final class PhoenixConfigurationUtil {
public static final String RESTORE_DIR_KEY = "phoenix.tableSnapshot.restore.dir";
public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
+ private static final String INDEX_TOOL_END_TIME = "phoenix.mr.index.endtime";
+ private static final String INDEX_TOOL_START_TIME = "phoenix.mr.index.starttime";
public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
@@ -279,6 +281,28 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(restoreDir);
configuration.set(RESTORE_DIR_KEY, restoreDir);
}
+
+ public static void setIndexToolStartTime(Configuration configuration, Long startTime) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(startTime);
+ configuration.set(INDEX_TOOL_START_TIME, Long.toString(startTime));
+ }
+
+ public static void setCurrentScnValue(Configuration configuration, Long scn) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(scn);
+ configuration.set(CURRENT_SCN_VALUE, Long.toString(scn));
+ }
+
+ public static String getIndexToolStartTime(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INDEX_TOOL_START_TIME);
+ }
+
+ public static String getCurrentScnValue(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(CURRENT_SCN_VALUE);
+ }
public static List<String> getUpsertColumnNames(final Configuration configuration) {
return getValues(configuration, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
new file mode 100644
index 0000000..d82e99a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
+import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
+import static org.mockito.Mockito.when;
+
+public class IndexToolTest extends BaseTest {
+
+ IndexTool it;
+ private String dataTable;
+ private String indexTable;
+ private String schema;
+ private String tenantId;
+ @Mock
+ PTable pDataTable;
+ boolean localIndex = true;
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ it = new IndexTool();
+ schema = generateUniqueName();
+ dataTable = generateUniqueName();
+ indexTable = generateUniqueName();
+ tenantId = generateUniqueName();
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testParseOptions_timeRange_timeRangeNotNull() {
+ Long startTime = 10L;
+ Long endTime = 15L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ Assert.assertEquals(startTime, it.getStartTime());
+ Assert.assertEquals(endTime, it.getEndTime());
+ }
+
+ @Test
+ public void testParseOptions_timeRange_null() {
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ Assert.assertNull(it.getStartTime());
+ Assert.assertNull(it.getEndTime());
+ }
+
+ @Test
+ public void testParseOptions_timeRange_startTimeNotNull() {
+ Long startTime = 10L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , null);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ Assert.assertEquals(startTime, it.getStartTime());
+ Assert.assertEquals(null, it.getEndTime());
+ }
+
+ @Test
+ public void testParseOptions_timeRange_endTimeNotNull() {
+ Long endTime = 15L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ null , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ Assert.assertEquals(null, it.getStartTime());
+ Assert.assertEquals(endTime, it.getEndTime());
+ }
+
+ @Test
+ public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() {
+ Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ null , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test
+ public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() {
+ Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , null);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test(timeout = 10000 /* 10 secs */)
+ public void testParseOptions_timeRange_startTimeInFuture() {
+ Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+ Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 200000;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test(timeout = 10000 /* 10 secs */)
+ public void testParseOptions_timeRange_endTimeInFuture() {
+ Long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test
+ public void testParseOptions_timeRange_startTimeEqEndTime() {
+ Long startTime = 10L;
+ Long endTime = 10L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test
+ public void testParseOptions_timeRange_startTimeGtEndTime() {
+ Long startTime = 10L;
+ Long endTime = 1L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+ it.populateIndexToolAttributes(cmdLine);
+ }
+
+ @Test
+ public void testCheckTimeRangeFeature_timeRangeSet_transactionalTable_globalIndex() {
+ when(pDataTable.isTransactional()).thenReturn(true);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE);
+ IndexTool.checkTimeRangeFeature(1L, 3L, pDataTable, !localIndex);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index cb5efa1..f58605f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -286,4 +287,18 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
}
+ @Test
+ public void testTimeRangeOverride() {
+ final Configuration configuration = new Configuration();
+ Long startTime = 1L;
+ Long endTime = 2L;
+
+ PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ Assert.assertEquals(startTime.longValue(),
+ Long.parseLong(PhoenixConfigurationUtil.getIndexToolStartTime(configuration)));
+ Assert.assertEquals(endTime.longValue(),
+ Long.parseLong(PhoenixConfigurationUtil.getCurrentScnValue(configuration)));
+
+ }
}