You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/03/24 02:47:02 UTC

[phoenix] branch master updated: PHOENIX-5732: Implement endtime in IndexTool for rebuild and verification

This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9379540  PHOENIX-5732: Implement endtime in IndexTool for rebuild and verification
9379540 is described below

commit 9379540a5386721dfb08b0f7087ade18949ebf29
Author: s.kadam <s....@apache.org>
AuthorDate: Thu Mar 19 13:28:33 2020 -0700

    PHOENIX-5732: Implement endtime in IndexTool for rebuild and verification
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  24 +-
 .../phoenix/end2end/IndexToolTimeRangeIT.java      | 218 ++++++++++
 .../PhoenixServerBuildIndexInputFormat.java        |  21 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 470 +++++++++++----------
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  24 ++
 .../org/apache/phoenix/index/IndexToolTest.java    | 203 +++++++++
 .../util/PhoenixConfigurationUtilTest.java         |  15 +
 7 files changed, 743 insertions(+), 232 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 5cf9da6..c123741 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -962,8 +962,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
-                                            String dataTable, String indxTable, String tenantId,
-                                            IndexTool.IndexVerifyType verifyType) {
+                            String dataTable, String indxTable, String tenantId,
+                            IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
         List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -988,7 +988,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             args.add("-tenant");
             args.add(tenantId);
         }
-
+        if(startTime != null) {
+            args.add("-st");
+            args.add(String.valueOf(startTime));
+        }
+        if(endTime != null) {
+            args.add("-et");
+            args.add(String.valueOf(endTime));
+        }
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
         return args;
@@ -996,7 +1003,16 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
     public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
                                         String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
-        List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
+        List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+                tenantId, verifyType, null, null);
+        return args.toArray(new String[0]);
+    }
+
+    public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+            String dataTable, String indexTable, String tenantId,
+            IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
+        List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+                tenantId, verifyType, startTime, endTime);
         return args.toArray(new String[0]);
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
new file mode 100644
index 0000000..09446c6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
+    private static final String
+            CREATE_TABLE_DDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, "
+            + "VAL1 INTEGER, VAL2 INTEGER) COLUMN_ENCODED_BYTES=0";
+    public static final String CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)";
+    private static final String UPSERT_TABLE_DML = "UPSERT INTO %s VALUES(?,?,?)";
+
+    private static String dataTableFullName, indexTableFullName,
+            schemaName, dataTableName, indexTableName;
+    static CustomEnvironmentEdge customEdge = new CustomEnvironmentEdge();
+
+    @BeforeClass
+    public static synchronized void setup() throws Exception {
+        setupMiniCluster();
+        createTableAndIndex();
+        populateDataTable();
+    }
+
+    private static void createTableAndIndex() throws SQLException {
+        schemaName = generateUniqueName();
+        dataTableName = "D_"+generateUniqueName();
+        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        indexTableName = "I_"+generateUniqueName();
+        indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        try (Connection conn = getConnection()) {
+            incrementEdgeByOne();
+            conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+            conn.commit();
+            incrementEdgeByOne();
+            conn.createStatement().execute(String.format(CREATE_INDEX_DDL, indexTableName,
+                    dataTableFullName));
+            conn.commit();
+        }
+    }
+
+    private static void incrementEdgeByOne() {
+        customEdge.incrementValue(1);
+        EnvironmentEdgeManager.injectEdge(customEdge);
+    }
+
+    private static void populateDataTable() throws SQLException {
+        try (Connection conn = getConnection()) {
+            //row 1-> time 4, row 2-> time 5, row 3-> time 6, row 4-> time 7, row 5-> time 8
+            for (int i=0; i<5; i++) {
+                incrementEdgeByOne();
+                PreparedStatement ps = conn.prepareStatement(
+                        String.format(UPSERT_TABLE_DML, dataTableFullName));
+                ps.setInt(1, i+1);
+                ps.setInt(2,(i+1)*10);
+                ps.setInt(3, (i+1)*100);
+                ps.execute();
+                conn.commit();
+            }
+        }
+    }
+
+    private static void setupMiniCluster() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+        serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+        serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                Long.toString(5));
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+                QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+        clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+        clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+                new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+    private int countRowsInIndex() throws SQLException {
+        incrementEdgeByOne();
+        String select = "SELECT COUNT(*) FROM "+indexTableFullName;
+        try(Connection conn = getConnection()) {
+            ResultSet rs = conn.createStatement().executeQuery(select);
+            while(rs.next()) {
+                return rs.getInt(1);
+            }
+        }
+        return -1;
+    }
+
+    private static Connection getConnection() throws SQLException {
+        return DriverManager.getConnection(getUrl(),
+                PropertiesUtil.deepCopy(TEST_PROPERTIES));
+    }
+
+    @Before
+    public void beforeTest() {
+        incrementEdgeByOne();
+    }
+
+    @Test
+    public void testValidTimeRange() throws Exception {
+        String [] args = {"--deleteall", "--starttime", "1" , "--endtime", "9"};
+        runIndexTool(args, 0);
+        // all rows should be rebuilt
+        Assert.assertEquals(5, countRowsInIndex());
+    }
+
+    @Ignore("Until PHOENIX-5783 is fixed")
+    @Test
+    public void testValidTimeRange_startTimeInBetween() throws Exception {
+        String [] args = {"--deleteall", "--starttime", "6" , "--endtime", "9"};
+        runIndexTool(args, 0);
+        // only last 3 rows should be rebuilt
+        Assert.assertEquals(3, countRowsInIndex());
+    }
+
+    @Test
+    public void testValidTimeRange_endTimeInBetween() throws Exception {
+        String [] args = {"--deleteall", "--starttime", "1" , "--endtime", "6"};
+        runIndexTool(args, 0);
+        // only first 2 should be rebuilt
+        Assert.assertEquals(2, countRowsInIndex());
+    }
+
+    @Test
+    public void testNoTimeRangePassed() throws Exception {
+        String [] args = {"--deleteall"};
+        runIndexTool(args, 0);
+        // all rows should be rebuilt
+        Assert.assertEquals(5, countRowsInIndex());
+    }
+
+    @Ignore("Until PHOENIX-5783 is fixed")
+    @Test
+    public void testValidTimeRange_onlyStartTimePassed() throws Exception {
+        //starttime passed of last upsert
+        String [] args = {"--deleteall", "--starttime", "8"};
+        runIndexTool(args, 0);
+        Assert.assertEquals(1, countRowsInIndex());
+    }
+
+    @Test
+    public void testValidTimeRange_onlyEndTimePassed() throws Exception {
+        //end time passed as time of second upsert
+        String [] args = {"--deleteall", "--endtime", "5"};
+        runIndexTool(args, 0);
+        Assert.assertEquals(1, countRowsInIndex());
+    }
+
+    private void runIndexTool(String [] args, int expectedStatus) throws Exception {
+        IndexToolIT.runIndexTool(true, false, schemaName, dataTableName,
+                indexTableName, null, expectedStatus,
+                IndexTool.IndexVerifyType.NONE, args);
+    }
+
+    private static class CustomEnvironmentEdge extends EnvironmentEdge {
+        protected long value = 1L;
+
+        public void setValue(long newValue) {
+            value = newValue;
+        }
+
+        public void incrementValue(long addedValue) {
+            value += addedValue;
+        }
+
+        @Override
+        public long currentTime() {
+            return this.value;
+        }
+    }
+
+    @AfterClass
+    public static void teardown() {
+        tearDownMiniClusterAsync(2);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index d086fed..66d9313 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -39,8 +39,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getCurrentScnValue;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 /**
@@ -60,20 +64,22 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
     }
 
     @Override
-    protected  QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+    protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
             throws IOException {
         Preconditions.checkNotNull(context);
         if (queryPlan != null) {
             return queryPlan;
         }
         final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
-        final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        final String currentScnValue = getCurrentScnValue(configuration);
         final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        //until PHOENIX-5783 is fixed; we'll continue with startTime = 0
+        final String startTimeValue = null;
         final Properties overridingProps = new Properties();
-        if(txnScnValue==null && currentScnValue!=null) {
+        if (txnScnValue==null && currentScnValue!=null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+        if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null) {
             overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
         String dataTableFullName = getIndexToolDataTableName(configuration);
@@ -82,8 +88,9 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
             PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
             Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
-            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
-                    Long.toString(scn));
+            setCurrentScnValue(configuration, scn);
+
+            Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue);
             PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
             ServerBuildIndexCompiler compiler =
                     new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
@@ -91,7 +98,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
             Scan scan = plan.getContext().getScan();
 
             try {
-                scan.setTimeRange(0, scn);
+                scan.setTimeRange(startTime, scn);
                 scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
                 scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE,
                         PhoenixConfigurationUtil.getIndexVerifyType(configuration).toBytes());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index c71c0ff..08e1564 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -89,16 +90,19 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.HConnectionFactory;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.EquiDepthStreamHistogram;
 import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket;
 import org.apache.phoenix.util.IndexUtil;
@@ -126,7 +130,7 @@ public class IndexTool extends Configured implements Tool {
         private String value;
         private byte[] valueBytes;
 
-        private IndexVerifyType(String value) {
+        IndexVerifyType(String value) {
             this.value = value;
             this.valueBytes = PVarchar.INSTANCE.toBytes(value);
         }
@@ -201,19 +205,21 @@ public class IndexTool extends Configured implements Tool {
     private String schemaName;
     private String dataTable;
     private String indexTable;
-    private boolean isPartialBuild;
+    private boolean isPartialBuild, isForeground;
     private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
     private String qDataTable;
     private String qIndexTable;
     private boolean useSnapshot;
-    private boolean isLocalIndexBuild;
+    private boolean isLocalIndexBuild = false;
     private boolean shouldDeleteBeforeRebuild;
-    private PTable pIndexTable;
+    private PTable pIndexTable = null;
     private PTable pDataTable;
-    private String tenantId;
+    private String tenantId = null;
     private Job job;
-
-
+    private Long startTime, endTime;
+    private IndexType indexType;
+    private String basePath;
+    byte[][] splitKeysBeforeJob = null;
 
     private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
             "Phoenix schema name (optional)");
@@ -272,8 +278,20 @@ public class IndexTool extends Configured implements Tool {
             + "If specified, truncates the index table and rebuilds (optional)");
 
     private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+    private static final Option START_TIME_OPTION = new Option("st", "starttime",
+            true, "Start time for indextool rebuild or verify");
+    private static final Option END_TIME_OPTION = new Option("et", "endtime",
+            true, "End time for indextool rebuild or verify");
+
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
 
+    public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
+            + "or equal to endTime "
+            + "or either of them are set in the future; IndexTool can't proceed.";
+
+    public static final String FEATURE_NOT_APPLICABLE = "starttime-endtime feature is only "
+            + "applicable for local or non-transactional global indexes";
+
     private Options getOptions() {
         final Options options = new Options();
         options.addOption(SCHEMA_NAME_OPTION);
@@ -289,9 +307,13 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(DELETE_ALL_AND_REBUILD_OPTION);
         options.addOption(HELP_OPTION);
         AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
-        options.addOption(AUTO_SPLIT_INDEX_OPTION);
         SPLIT_INDEX_OPTION.setOptionalArg(true);
+        START_TIME_OPTION.setOptionalArg(true);
+        END_TIME_OPTION.setOptionalArg(true);
+        options.addOption(AUTO_SPLIT_INDEX_OPTION);
         options.addOption(SPLIT_INDEX_OPTION);
+        options.addOption(START_TIME_OPTION);
+        options.addOption(END_TIME_OPTION);
         return options;
     }
 
@@ -301,7 +323,8 @@ public class IndexTool extends Configured implements Tool {
      * @param args supplied command line arguments
      * @return the parsed command line
      */
-    private CommandLine parseOptions(String[] args) {
+    @VisibleForTesting
+    public CommandLine parseOptions(String[] args) {
 
         final Options options = getOptions();
 
@@ -322,15 +345,18 @@ public class IndexTool extends Configured implements Tool {
                     + "parameter");
         }
         
-		if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
-			throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
-		}
+        if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
+                && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException("Index name should not be passed with "
+                    + PARTIAL_REBUILD_OPTION.getLongOpt());
+        }
 
-        if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+        if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
+                && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
             throw new IllegalStateException("Index name should be passed unless it is a partial rebuild.");
         }
 
-		if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
+        if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
             throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not compatible with "
                     + PARTIAL_REBUILD_OPTION.getLongOpt());
         }
@@ -356,6 +382,14 @@ public class IndexTool extends Configured implements Tool {
         System.exit(exitCode);
     }
 
+    public Long getStartTime() {
+        return startTime;
+    }
+
+    public Long getEndTime() {
+        return endTime;
+    }
+
     class JobFactory {
         Connection connection;
         Configuration configuration;
@@ -366,13 +400,6 @@ public class IndexTool extends Configured implements Tool {
             this.connection = connection;
             this.configuration = configuration;
             this.outputPath = outputPath;
-
-        }
-
-        void closeConnection() throws SQLException {
-            if (this.connection != null) {
-                this.connection.close();
-            }
         }
 
         public Job getJob() throws Exception {
@@ -387,14 +414,19 @@ public class IndexTool extends Configured implements Tool {
                     configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name());
                 }
                 if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) {
-                    configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
-                            Long.toString(maxTimeRange));
+                    PhoenixConfigurationUtil.setCurrentScnValue(configuration, maxTimeRange);
                     return configureJobForAsyncIndex();
                 } else {
                     // Local and non-transactional global indexes to be built on the server side
                     // It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure that
                     // all the rows that exist so far will be rebuilt. The current time of the servers will
                     // be used to set the time range for server side scans.
+
+                    // However, PHOENIX-5732 introduces endTime parameter to be passed optionally for IndexTool.
+                    // When endTime is passed for local and non-tx global indexes, we'll override the CURRENT_SCN_VALUE.
+                    if (endTime != null) {
+                        PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+                    }
                     return configureJobForServerBuildIndex();
                 }
             }
@@ -472,9 +504,9 @@ public class IndexTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setDisableIndexes(configuration, StringUtils.join(",",disableIndexes));
             
             final Job job = Job.getInstance(configuration, jobName);
-			if (outputPath != null) {
-				FileOutputFormat.setOutputPath(job, outputPath);
-			}
+            if (outputPath != null) {
+                FileOutputFormat.setOutputPath(job, outputPath);
+            }
             job.setJarByClass(IndexTool.class);
             TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, PhoenixIndexPartialBuildMapper.class, null,
                     null, job);
@@ -491,14 +523,15 @@ public class IndexTool extends Configured implements Tool {
             for (String index : disableIndexes) {
                 quotedIndexes.add("'" + index + "'");
             }
-            ResultSet rs = connection.createStatement()
+            try (ResultSet rs = connection.createStatement()
                     .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
                             + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM
                             + (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL")
-                            + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")");
-            if (rs.next()) {
-                maxRebuilAsyncDate = rs.getLong(1);
-                maxDisabledTimeStamp = rs.getLong(2);
+                            + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")")) {
+                if (rs.next()) {
+                    maxRebuilAsyncDate = rs.getLong(1);
+                    maxDisabledTimeStamp = rs.getLong(2);
+                }
             }
             // Do check if table is disabled again after user invoked async rebuilding during the run of the job
             if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
@@ -507,7 +540,6 @@ public class IndexTool extends Configured implements Tool {
                 throw new RuntimeException(
                         "Inconsistent state we have one or more index tables which are disabled after the async is called!!");
             }
-            
         }
 
         private Job configureJobForAsyncIndex() throws Exception {
@@ -602,6 +634,9 @@ public class IndexTool extends Configured implements Tool {
 
             PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
             PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+            if (startTime != null) {
+                PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+            }
             PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
             String physicalIndexTable = pIndexTable.getPhysicalName().getString();
 
@@ -631,17 +666,6 @@ public class IndexTool extends Configured implements Tool {
             return configureSubmittableJobUsingDirectApi(job);
         }
 
-        /**
-         * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
-         * waits for the job completion based on runForeground parameter.
-         * 
-         * @param job
-         * @param outputPath
-         * @param runForeground - if true, waits for job completion, else submits and returns
-         *            immediately.
-         * @return
-         * @throws Exception
-         */
         private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
             job.setReducerClass(PhoenixIndexImportDirectReducer.class);
             Configuration conf = job.getConfiguration();
@@ -688,118 +712,26 @@ public class IndexTool extends Configured implements Tool {
 
     @Override
     public int run(String[] args) throws Exception {
-        Connection connection = null;
-        Table htable = null;
-        RegionLocator regionLocator = null;
-        JobFactory jobFactory = null;
-        org.apache.hadoop.hbase.client.Connection hConn = null;
+        CommandLine cmdLine;
         try {
-            CommandLine cmdLine = null;
-            try {
-                cmdLine = parseOptions(args);
-            } catch (IllegalStateException e) {
-                printHelpAndExit(e.getMessage(), getOptions());
-            }
-            final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
-            boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
-            tenantId = null;
-            if (useTenantId) {
-                tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
-                configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-            }
-
-            schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
-            dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
-            indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
-            isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
-            if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
-                String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
-                indexVerifyType = IndexVerifyType.fromValue(value);
-            }
-            qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
-            try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
-                pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
-            }
-            String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
-            boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
-            useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
-            shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
-
-            byte[][] splitKeysBeforeJob = null;
-            isLocalIndexBuild = false;
-            pIndexTable = null;
-
-            connection = ConnectionUtil.getInputConnection(configuration);
-            createIndexToolTables(connection);
-            
-            if (indexTable != null) {
-                if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
-                    throw new IllegalArgumentException(String.format(
-                        " %s is not an index table for %s for this connection", indexTable, qDataTable));
-                }
-                pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
-                        ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
-                if (schemaName != null && !schemaName.isEmpty()) {
-                    qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
-                } else {
-                    qIndexTable = indexTable;
-                }
-                htable = connection.unwrap(PhoenixConnection.class).getQueryServices()
-                        .getTable(pIndexTable.getPhysicalName().getBytes());
-                hConn = ConnectionFactory.createConnection(configuration);
-                regionLocator = hConn.getRegionLocator(
-                            TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
-                if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) {
-                    isLocalIndexBuild = true;
-                    splitKeysBeforeJob = regionLocator.getStartKeys();
-                }
-
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            return -1;
+        }
+        populateIndexToolAttributes(cmdLine);
+        Configuration configuration = getConfiguration(tenantId);
+        try (Connection conn = getConnection(configuration)) {
+            createIndexToolTables(conn);
+            if (dataTable != null && indexTable != null) {
+                setupIndexAndDataTable(conn);
+                checkTimeRangeFeature(startTime, endTime, pDataTable, isLocalIndexBuild);
                 if (shouldDeleteBeforeRebuild) {
-                   deleteBeforeRebuild(connection);
-                }
-
-                // presplit the index table
-                boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
-                boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
-                if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType())
-                        && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
-                    String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
-                    int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
-                    String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
-                    double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
-                    LOGGER.info(String.format("Will split index %s , autosplit=%s ," +
-                            " autoSplitNumRegions=%s , samplingRate=%s", indexTable,
-                            autosplit, autosplitNumRegions, samplingRate));
-                    splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration);
+                    deleteBeforeRebuild(conn);
                 }
             }
-			Path outputPath = null;
-			FileSystem fs = null;
-			if (basePath != null) {
-				outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null
-						? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString());
-				fs = outputPath.getFileSystem(configuration);
-				fs.delete(outputPath, true);
-			}
-
-            // We have to mark Disable index to Building before we can set it to Active in the reducer. Otherwise it errors out with
-            // index state transition error
-			if (pIndexTable != null && pIndexTable.getIndexState() == PIndexState.DISABLE) {
-                IndexUtil.updateIndexState(connection.unwrap(PhoenixConnection.class),
-                        pIndexTable.getName().getString(), PIndexState.BUILDING, null);
-            }
-            jobFactory = new JobFactory(connection, configuration, outputPath);
-            job = jobFactory.getJob();
-
-            if (!isForeground) {
-                LOGGER.info("Running Index Build in Background - Submit async and exit");
-                job.submit();
-                return 0;
-            }
-            LOGGER.info("Running Index Build in Foreground. Waits for the build to complete." +
-                    " This may take a long time!.");
-            boolean result = job.waitForCompletion(true);
-            
+            preSplitIndexTable(cmdLine, conn);
+            boolean result = submitIndexToolJob(conn, configuration);
             if (result) {
                 return 0;
             } else {
@@ -808,64 +740,168 @@ public class IndexTool extends Configured implements Tool {
             }
         } catch (Exception ex) {
             LOGGER.error("An exception occurred while performing the indexing job: "
-                    + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
+                + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
             return -1;
-        } finally {
-            boolean rethrowException = false;
-            try {
-                if (connection != null) {
-                    try {
-                        connection.close();
-                    } catch (SQLException e) {
-                        LOGGER.error("Failed to close connection ", e);
-                        rethrowException = true;
-                    }
-                }
-                if (htable != null) {
-                    try {
-                        htable.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Failed to close htable ", e);
-                        rethrowException = true;
-                    }
-                }
-                if (hConn != null) {
-                    try {
-                        hConn.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Failed to close hconnection ", e);
-                        rethrowException = true;
-                    }
-                }
-                if (regionLocator != null) {
-                    try {
-                        regionLocator.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Failed to close regionLocator ", e);
-                        rethrowException = true;
-                    }
-                }
-                if (jobFactory != null) {
-                    try {
-                        jobFactory.closeConnection();
-                    } catch (SQLException e) {
-                        LOGGER.error("Failed to close jobFactory ", e);
-                        rethrowException = true;
-                    }
-                }
-            } finally {
-                if (rethrowException) {
-                    throw new RuntimeException("Failed to close resource");
-                }
+        }
+    }
+
+    public static void checkTimeRangeFeature(Long startTime, Long endTime, PTable pDataTable, boolean isLocalIndexBuild) {
+        if (isTimeRangeSet(startTime, endTime) && !isTimeRangeFeatureApplicable(pDataTable, isLocalIndexBuild)) {
+            throw new RuntimeException(FEATURE_NOT_APPLICABLE);
+        }
+    }
+
+    private Configuration getConfiguration(String tenantId) {
+        final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+        if (tenantId != null) {
+            configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        }
+        return configuration;
+    }
+
+    private boolean submitIndexToolJob(Connection conn, Configuration configuration)
+            throws Exception {
+        Path outputPath = null;
+        FileSystem fs;
+        if (basePath != null) {
+            outputPath =
+                    CsvBulkImportUtil.getOutputPath(new Path(basePath),
+                            pIndexTable == null ?
+                                    pDataTable.getPhysicalName().getString() :
+                                    pIndexTable.getPhysicalName().getString());
+            fs = outputPath.getFileSystem(configuration);
+            fs.delete(outputPath, true);
+        }
+        JobFactory jobFactory = new JobFactory(conn, configuration, outputPath);
+        job = jobFactory.getJob();
+        if (!isForeground) {
+            LOGGER.info("Running Index Build in Background - Submit async and exit");
+            job.submit();
+            return true;
+        }
+        LOGGER.info("Running Index Build in Foreground. Waits for the build to complete."
+                + " This may take a long time!.");
+        return job.waitForCompletion(true);
+    }
+
+    @VisibleForTesting
+    public void populateIndexToolAttributes(CommandLine cmdLine) {
+        boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
+        boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
+        boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+        boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
+
+        if (useTenantId) {
+            tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+        }
+        if(useStartTime) {
+            startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
+        }
+        if (useEndTime) {
+            endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
+        }
+        if(isTimeRangeSet(startTime, endTime)) {
+            validateTimeRange();
+        }
+        if (verify) {
+            String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+            indexVerifyType = IndexVerifyType.fromValue(value);
+        }
+        schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+        dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+        indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+        isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+        qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+        basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+        useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+        shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
+    }
+
+    private void validateTimeRange() {
+        Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        Long st = (startTime == null) ? 0 : startTime;
+        Long et = (endTime == null) ? currentTime : endTime;
+        if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
+            throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        }
+    }
+
+    private Connection getConnection(Configuration configuration) throws SQLException {
+        return ConnectionUtil.getInputConnection(configuration);
+    }
+
+    private void setupIndexAndDataTable(Connection connection) throws SQLException, IOException {
+        pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
+        if (!isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
+            throw new IllegalArgumentException(
+                    String.format(" %s is not an index table for %s for this connection",
+                            indexTable, qDataTable));
+        }
+        pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
+                ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
+        indexType = pIndexTable.getIndexType();
+        if (schemaName != null && !schemaName.isEmpty()) {
+            qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
+        } else {
+            qIndexTable = indexTable;
+        }
+        if (IndexType.LOCAL.equals(indexType)) {
+            isLocalIndexBuild = true;
+            try (org.apache.hadoop.hbase.client.Connection hConn
+                    = getTemporaryHConnection(connection.unwrap(PhoenixConnection.class))) {
+                RegionLocator regionLocator = hConn
+                        .getRegionLocator(TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
+                splitKeysBeforeJob = regionLocator.getStartKeys();
             }
         }
+        // We have to mark Disable index to Building before we can set it to Active in the reducer. Otherwise it errors out with
+        // index state transition error
+        changeDisabledIndexStateToBuiding(connection);
+    }
+
+    private static boolean isTimeRangeSet(Long startTime, Long endTime) {
+        return startTime != null || endTime != null;
+    }
+
+    private static boolean isTimeRangeFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) {
+        if (isLocalIndexBuild || !dataTable.isTransactional()) {
+            return true;
+        }
+        return false;
+    }
+
+    private void changeDisabledIndexStateToBuiding(Connection connection) throws SQLException {
+        if (pIndexTable != null && pIndexTable.getIndexState() == PIndexState.DISABLE) {
+            IndexUtil.updateIndexState(connection.unwrap(PhoenixConnection.class),
+                    pIndexTable.getName().getString(), PIndexState.BUILDING, null);
+        }
+    }
+
+    private void preSplitIndexTable(CommandLine cmdLine, Connection connection)
+            throws SQLException, IOException {
+        boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
+        boolean splitIndex = cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
+        boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
+        if (!isSalted && IndexType.GLOBAL.equals(indexType) && (autosplit || splitIndex)) {
+            String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
+            int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
+            String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
+            double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
+            LOGGER.info(String.format("Will split index %s , autosplit=%s ,"
+                            + " autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit,
+                    autosplitNumRegions, samplingRate));
+
+            splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit,
+                    autosplitNumRegions, samplingRate);
+        }
     }
 
     private void deleteBeforeRebuild(Connection conn) throws SQLException, IOException {
         if (MetaDataUtil.isViewIndex(pIndexTable.getPhysicalName().getString())) {
             throw new IllegalArgumentException(String.format(
-                    "%s is a view index. delete-all-and-rebuild is not supported for view indexes",
-                    indexTable));
+                "%s is a view index. delete-all-and-rebuild is not supported for view indexes",
+                indexTable));
         }
 
         if (isLocalIndexBuild) {
@@ -881,16 +917,16 @@ public class IndexTool extends Configured implements Tool {
         }
     }
 
-    private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration)
-            throws SQLException, IOException, IllegalArgumentException, InterruptedException {
+    private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit,
+            int autosplitNumRegions, double samplingRate)
+            throws SQLException, IOException, IllegalArgumentException {
         int numRegions;
 
-        try (org.apache.hadoop.hbase.client.Connection tempHConn =
-                ConnectionFactory.createConnection(configuration);
+        try (org.apache.hadoop.hbase.client.Connection tempHConn = getTemporaryHConnection(pConnection);
                 RegionLocator regionLocator =
                         tempHConn.getRegionLocator(TableName.valueOf(qDataTable))) {
             numRegions = regionLocator.getStartKeys().length;
-            if (autosplit && !(numRegions > autosplitNumRegions)) {
+            if (autosplit && (numRegions <= autosplitNumRegions)) {
                 LOGGER.info(String.format(
                     "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s",
                     pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions));
@@ -899,7 +935,7 @@ public class IndexTool extends Configured implements Tool {
         }
         // build a tablesample query to fetch index column values from the data table
         DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable);
-        String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate);
+        String qTableSample = String.format("%s TABLESAMPLE(%.2f)", qDataTable, samplingRate);
         List<String> dataColNames = colNames.getDataColNames();
         final String dataSampleQuery =
                 QueryUtil.constructSelectStatement(qTableSample, dataColNames, null,
@@ -936,6 +972,11 @@ public class IndexTool extends Configured implements Tool {
         }
     }
 
+    private org.apache.hadoop.hbase.client.Connection getTemporaryHConnection(PhoenixConnection pConnection)
+            throws SQLException, IOException {
+        return ConnectionFactory.createConnection(pConnection.getQueryServices().getAdmin().getConfiguration());
+    }
+
     // setup a ValueGetter to get index values from the ResultSet
     private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) {
         // map from data col name to index in ResultSet
@@ -944,7 +985,7 @@ public class IndexTool extends Configured implements Tool {
         for (String dataCol : dataColNames) {
             rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++);
         }
-        ValueGetter getter = new ValueGetter() {
+        return new ValueGetter() {
             final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
             final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable();
 
@@ -968,19 +1009,6 @@ public class IndexTool extends Configured implements Tool {
                 return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
             }
         };
-        return getter;
-    }
-
-    private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, RegionLocator regionLocator) throws Exception {
-        if (splitKeysBeforeJob != null
-                && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) {
-            String errMsg = "The index to build is local index and the split keys are not matching"
-                    + " before and after running the job. Please rerun the job otherwise"
-                    + " there may be inconsistencies between actual data and index data";
-            LOGGER.error(errMsg);
-            throw new Exception(errMsg);
-        }
-        return true;
     }
 
     /**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 196c842..6331bf8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -163,6 +163,8 @@ public final class PhoenixConfigurationUtil {
     public static final String RESTORE_DIR_KEY = "phoenix.tableSnapshot.restore.dir";
 
     public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
+    private static final String INDEX_TOOL_END_TIME = "phoenix.mr.index.endtime";
+    private static final String INDEX_TOOL_START_TIME = "phoenix.mr.index.starttime";
 
     public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
 
@@ -279,6 +281,28 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(restoreDir);
         configuration.set(RESTORE_DIR_KEY, restoreDir);
     }
+
+    public static void setIndexToolStartTime(Configuration configuration, Long startTime) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(startTime);
+        configuration.set(INDEX_TOOL_START_TIME, Long.toString(startTime));
+    }
+
+    public static void setCurrentScnValue(Configuration configuration, Long scn) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(scn);
+        configuration.set(CURRENT_SCN_VALUE, Long.toString(scn));
+    }
+
+    public static String getIndexToolStartTime(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INDEX_TOOL_START_TIME);
+    }
+
+    public static String getCurrentScnValue(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(CURRENT_SCN_VALUE);
+    }
     
     public static List<String> getUpsertColumnNames(final Configuration configuration) {
         return getValues(configuration, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
new file mode 100644
index 0000000..d82e99a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
+import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
+import static org.mockito.Mockito.when;
+
+public class IndexToolTest extends BaseTest {
+
+    IndexTool it;
+    private String dataTable;
+    private String indexTable;
+    private String schema;
+    private String tenantId;
+    @Mock
+    PTable pDataTable;
+    boolean localIndex = true;
+
+    @Rule
+    public ExpectedException exceptionRule = ExpectedException.none();
+
+    @Before
+    public void setup() {
+        it = new IndexTool();
+        schema = generateUniqueName();
+        dataTable = generateUniqueName();
+        indexTable = generateUniqueName();
+        tenantId = generateUniqueName();
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testParseOptions_timeRange_timeRangeNotNull() {
+        Long startTime = 10L;
+        Long endTime = 15L;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        it.populateIndexToolAttributes(cmdLine);
+        Assert.assertEquals(startTime, it.getStartTime());
+        Assert.assertEquals(endTime, it.getEndTime());
+    }
+
+    @Test
+    public void testParseOptions_timeRange_null() {
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE);
+        CommandLine cmdLine = it.parseOptions(args);
+        it.populateIndexToolAttributes(cmdLine);
+        Assert.assertNull(it.getStartTime());
+        Assert.assertNull(it.getEndTime());
+    }
+
+    @Test
+    public void testParseOptions_timeRange_startTimeNotNull() {
+        Long startTime = 10L;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , null);
+        CommandLine cmdLine = it.parseOptions(args);
+        it.populateIndexToolAttributes(cmdLine);
+        Assert.assertEquals(startTime, it.getStartTime());
+        Assert.assertEquals(null, it.getEndTime());
+    }
+
+    @Test
+    public void testParseOptions_timeRange_endTimeNotNull() {
+        Long endTime = 15L;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        null , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        it.populateIndexToolAttributes(cmdLine);
+        Assert.assertEquals(null, it.getStartTime());
+        Assert.assertEquals(endTime, it.getEndTime());
+    }
+
+    @Test
+    public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() {
+        Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        null , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test
+    public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() {
+        Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , null);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test(timeout = 10000 /* 10 secs */)
+    public void testParseOptions_timeRange_startTimeInFuture() {
+        Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+        Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 200000;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test(timeout = 10000 /* 10 secs */)
+    public void testParseOptions_timeRange_endTimeInFuture() {
+        Long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test
+    public void testParseOptions_timeRange_startTimeEqEndTime() {
+        Long startTime = 10L;
+        Long endTime = 10L;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test
+    public void testParseOptions_timeRange_startTimeGtEndTime() {
+        Long startTime = 10L;
+        Long endTime = 1L;
+        String [] args =
+                IndexToolIT.getArgValues(true, true, schema,
+                        dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+                        startTime , endTime);
+        CommandLine cmdLine = it.parseOptions(args);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
+        it.populateIndexToolAttributes(cmdLine);
+    }
+
+    @Test
+    public void testCheckTimeRangeFeature_timeRangeSet_transactionalTable_globalIndex() {
+        when(pDataTable.isTransactional()).thenReturn(true);
+        exceptionRule.expect(RuntimeException.class);
+        exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE);
+        IndexTool.checkTimeRangeFeature(1L, 3L, pDataTable, !localIndex);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index cb5efa1..f58605f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -286,4 +287,18 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
 
     }
 
+    @Test
+    public void testTimeRangeOverride() {
+        final Configuration configuration = new Configuration();
+        Long startTime = 1L;
+        Long endTime = 2L;
+
+        PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
+        PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+        Assert.assertEquals(startTime.longValue(),
+                Long.parseLong(PhoenixConfigurationUtil.getIndexToolStartTime(configuration)));
+        Assert.assertEquals(endTime.longValue(),
+                Long.parseLong(PhoenixConfigurationUtil.getCurrentScnValue(configuration)));
+
+    }
 }