You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/12/16 02:06:37 UTC

[phoenix] branch 4.x updated: PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 5877936  PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired
5877936 is described below

commit 5877936257e0356f47eedd25346234230822e115
Author: Xinyi Yan <xy...@salesforce.com>
AuthorDate: Wed Nov 18 21:49:47 2020 -0800

    PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 .../DefaultPhoenixMultiViewListProviderIT.java     | 205 +++++
 .../apache/phoenix/end2end/PhoenixTTLToolIT.java   | 848 +++++++++++++++++++++
 .../mapreduce/PhoenixMultiViewInputFormat.java     | 113 +++
 .../mapreduce/PhoenixMultiViewInputSplit.java      |  77 ++
 .../phoenix/mapreduce/PhoenixMultiViewReader.java  |  86 +++
 .../mapreduce/PhoenixTTLDeleteJobMapper.java       | 241 ++++++
 .../apache/phoenix/mapreduce/PhoenixTTLTool.java   | 320 ++++++++
 .../util/DefaultMultiViewJobStatusTracker.java     |  50 ++
 .../util/DefaultMultiViewSplitStrategy.java        |  79 ++
 .../util/DefaultPhoenixMultiViewListProvider.java  | 200 +++++
 .../mapreduce/util/MultiViewJobStatusTracker.java  |  25 +
 .../mapreduce/util/MultiViewSplitStrategy.java     |  27 +
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  55 +-
 .../mapreduce/util/PhoenixMapReduceUtil.java       |  17 +
 .../mapreduce/util/PhoenixMultiInputUtil.java      | 114 +++
 .../util/PhoenixMultiViewListProvider.java         |  25 +
 .../phoenix/mapreduce/util/ViewInfoTracker.java    | 108 +++
 .../phoenix/mapreduce/util/ViewInfoWritable.java   |  52 ++
 .../org/apache/phoenix/query/QueryServices.java    |   1 -
 .../DefaultMultiViewSplitStrategyTest.java         | 110 +++
 .../mapreduce/PhoenixMultiViewInputFormatTest.java |  86 +++
 .../mapreduce/PhoenixMultiViewReaderTest.java      |  86 +++
 .../phoenix/mapreduce/PhoenixTTLToolTest.java      |  81 ++
 23 files changed, 2999 insertions(+), 7 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
new file mode 100644
index 0000000..94b8b56
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DELETE_ALL_VIEWS;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE;
+import static org.junit.Assert.assertEquals;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class DefaultPhoenixMultiViewListProviderIT extends ParallelStatsDisabledIT {
+    private final String BASE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID CHAR(10) NOT NULL, " +
+            "ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+            "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+    private final String VIEW_DDL = "CREATE VIEW %s (" +
+            "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+            " AS SELECT * FROM %s ";
+    private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL =
+            VIEW_DDL + " PHOENIX_TTL = 1000";
+    private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
+    private final String TENANT_VIEW_DDL =
+            "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+    private final String TENANT_VIEW_DDL_WITH_TTL = TENANT_VIEW_DDL + " PHOENIX_TTL = 1000";;
+
+    @Test
+    public void testGetPhoenixMultiViewList() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName3 = schema + "." + generateUniqueName();
+        String tenantViewName4 = schema + "." + generateUniqueName();
+        String indexTable1 = generateUniqueName() + "_IDX";
+        String indexTable2 = generateUniqueName() + "_IDX";
+        String indexTable3 = generateUniqueName() + "_IDX";
+        String indexTable4 = generateUniqueName() + "_IDX";
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+        DefaultPhoenixMultiViewListProvider defaultPhoenixMultiViewListProvider =
+                new DefaultPhoenixMultiViewListProvider();
+        Configuration cloneConfig = PropertiesUtil.cloneConfig(config);
+
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS,
+                DELETE_ALL_VIEWS);
+        cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+        List<ViewInfoWritable> result =
+                defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+
+        /*
+            Case 1 : no view
+         */
+        assertEquals(0, result.size());
+
+         /*
+            Case 2 :
+                    BaseMultiTenantTable
+                  GlobalView1 with TTL(1 ms)
+                Index1                 Index2
+                TenantView1,   TenantView2
+        */
+        try (Connection globalConn = DriverManager.getConnection(url);
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(url, tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) {
+
+            globalConn.createStatement().execute(String.format(BASE_TABLE_DDL, baseTableFullName));
+            globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+                    globalViewName1, baseTableFullName));
+
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D"));
+
+            tenant1Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL,tenantViewName1, globalViewName1));
+            tenant2Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL,tenantViewName2, globalViewName1));
+        }
+
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        // view with 2 index views is issuing 3 deletion jobs
+        // 1 from the data table, 2 from the index table.
+        assertEquals(3, result.size());
+
+         /*
+            Case 3: globalView2 without TTL
+                                     BaseMultiTenantTable
+             GlobalView1 with TTL(1 ms)            GlobalView2 without TTL
+        Index1                 Index2            Index3                    Index4
+            TenantView1,   TenantView2
+        */
+        try (Connection globalConn = DriverManager.getConnection(url)) {
+            globalConn.createStatement().execute(String.format(VIEW_DDL,
+                    globalViewName2, baseTableFullName));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D"));
+        }
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(3, result.size());
+
+         /*
+            Case 4: adding tenant3 and tenant4 with TTL
+                                     BaseMultiTenantTable
+             GlobalView1 with TTL(1 ms)            GlobalView2 without TTL
+        Index1                 Index2            Index3                    Index4
+            TenantView1,   TenantView2          TenantView3 with TTL,   TenantView4 without TTL
+        */
+
+        try (Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(url, tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) {
+            tenant1Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL_WITH_TTL,tenantViewName3, globalViewName2));
+            tenant2Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL,tenantViewName4, globalViewName2));
+        }
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(6, result.size());
+
+        /*
+            Testing tenant specific case. Even tenant1 created 2 leaf views, one of them was created
+            under a global view with TTL. This will not add to the deletion list.
+         */
+        cloneConfig = PropertiesUtil.cloneConfig(config);
+        cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(3, result.size());
+
+        /*
+            Deleting tenant1 with tenantViewName1 will NOT add any deletion job to the list because
+            the parent global view has TTL value.
+         */
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+                tenantViewName1);
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(0, result.size());
+
+        /*
+            Without tenant id, it will not add the job to the list even the tenant view name is
+            provided.
+         */
+        cloneConfig = PropertiesUtil.cloneConfig(config);
+        cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2");
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+                tenantViewName3);
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(0, result.size());
+
+        /*
+            tenant id + tenant view name will add 3 job to the list.
+            1 for data table and 2 for the index table.
+         */
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1);
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+                tenantViewName3);
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(3, result.size());
+
+        /*
+            tenant id + tenant view name will NOT add ot the list because tenantViewName4 did NOT
+            have TTL value.
+         */
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant2);
+        cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+                tenantViewName4);
+        result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig);
+        assertEquals(0, result.size());
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
new file mode 100644
index 0000000..d1d101f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java
@@ -0,0 +1,848 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final long PHOENIX_TTL_EXPIRE_IN_A_SECOND = 1;
+    private final long MILLISECOND = 1000;
+    private final long PHOENIX_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private final String VIEW_PREFIX1 = "V01";
+    private final String VIEW_PREFIX2 = "V02";
+    private final String UPSERT_TO_GLOBAL_VIEW_QUERY =
+            "UPSERT INTO %s (PK1,A,B,C,D) VALUES(1,1,1,1,1)";
+    private final String UPSERT_TO_LEAF_VIEW_QUERY =
+            "UPSERT INTO %s (PK1,A,B,C,D,E,F) VALUES(1,1,1,1,1,1,1)";
+    private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = "CREATE VIEW %s (" +
+            "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+            " AS SELECT * FROM %s WHERE ID = '%s' PHOENIX_TTL = %d";
+    private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
+    private final String TENANT_VIEW_DDL =
+            "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+
+    private void verifyNumberOfRowsFromHBaseLevel(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+                .getTable(SchemaUtil.getTableNameAsBytes(
+                        SchemaUtil.getSchemaNameFromFullName(tableName),
+                        SchemaUtil.getTableNameFromFullName(tableName)))) {
+            Filter filter =
+                    new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int numMatchingRows = 0;
+        for (Result result = scanner.next(); result != null; result = scanner.next()) {
+            numMatchingRows++;
+        }
+
+        scanner.close();
+        return numMatchingRows;
+    }
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(ddl);
+        }
+    }
+
+    /*
+                    BaseMultiTenantTable
+                  GlobalView1 with TTL(1 ms)
+                Index1                 Index2
+
+        Creating 2 tenantViews and Upserting data.
+        After running the MR job, it should delete all data.
+     */
+    @Test
+    public void testTenantViewOnGlobalViewWithMoreThanOneIndex() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String indexTable1 = generateUniqueName() + "_IDX";
+        String indexTable2 = generateUniqueName() + "_IDX";
+        String globalViewName = schema + "." + generateUniqueName();
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+        String tenantView1 = schema + "." + generateUniqueName();
+        String tenantView2 = schema + "." + generateUniqueName();
+        String indexTable = "_IDX_" + baseTableFullName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+                    globalViewName, baseTableFullName, VIEW_PREFIX1,
+                    PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable1, globalViewName, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable2, globalViewName, "C,D"));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL,tenantView1, globalViewName));
+            tenant2Connection.createStatement().execute(
+                    String.format(TENANT_VIEW_DDL,tenantView2, globalViewName));
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView1));
+            verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+            verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+
+            // the view has 2 view indexes, so upsert 1 row(base table) will result
+            // 2 rows(index table)
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 0, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 0, globalConn);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 0);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 0);
+        }
+    }
+
+    /*
+                                     BaseMultiTenantTable
+             GlobalView1 with TTL(1 ms)            GlobalView2 with TTL(1 DAY)
+        Index1                 Index2            Index3                    Index4
+
+        Upserting data to both global views and run the MR job.
+        It should only delete GlobalView1 data not remove GlobalView2 data.
+     */
+    @Test
+    public void testGlobalViewWithMoreThanOneIndex() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String indexTable1 = generateUniqueName() + "_IDX";
+        String indexTable2 = generateUniqueName() + "_IDX";
+        String indexTable3 = generateUniqueName() + "_IDX";
+        String indexTable4 = generateUniqueName() + "_IDX";
+        String indexTable = "_IDX_" + baseTableFullName;
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+
+            globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+                    globalViewName1, baseTableFullName, VIEW_PREFIX1,
+                    PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
+                    globalViewName2, baseTableFullName, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D"));
+
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D"));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+        }
+    }
+
+    /*
+                                    BaseMultiTenantTable
+            GlobalView1 with TTL(1 ms)            GlobalView2 with TTL(1 DAY)
+       Index1                 Index2            Index3                    Index4
+                TenantView1                                 TenantView2
+
+       Upserting data to both global views, and run the MR job.
+       It should only delete GlobalView1 data not remove GlobalView2 data.
+    */
+    @Test
+    public void testTenantViewCase() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName2 = schema + "." + generateUniqueName();
+        String indexTable1 = generateUniqueName() + "_IDX";
+        String indexTable2 = generateUniqueName() + "_IDX";
+        String indexTable3 = generateUniqueName() + "_IDX";
+        String indexTable4 = generateUniqueName() + "_IDX";
+        String indexTable = "_IDX_" + baseTableFullName;
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            ddl = "CREATE INDEX %s ON %s(%s)";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable1, globalViewName1, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable2, globalViewName1, "C,D"));
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable3, globalViewName2, "A,B"));
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable4, globalViewName2, "C,D"));
+
+            ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName1, globalViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName2, globalViewName2));
+
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName1, globalViewName1));
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName2, globalViewName2));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
+        }
+    }
+
+    /*
+                                     BaseMultiTenantTable
+             GlobalView1 with TTL(1 ms)            GlobalView2 with TTL(1 DAY)
+        Upserting data to both global views, and run the MR job.
+        It should only delete GlobalView1 data not remove GlobalView2 data.
+     */
+    @Test
+    public void testGlobalViewWithNoIndex() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+            verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
+        }
+    }
+
+    /*
+                                        BaseTable
+            GlobalView1 with TTL(1 ms)            GlobalView2 with TTL(1 DAY)
+       Upserting data to both global views, and run the MR job.
+       It should only delete GlobalView1 data not remove GlobalView2 data.
+    */
+    @Test
+    public void testGlobalViewOnNonMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + baseTableFullName  +
+                    " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
+            globalConn.createStatement().execute(ddl);
+
+            ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            globalConn.setAutoCommit(true);
+
+            globalConn.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
+            globalConn.createStatement().execute(
+                    String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+        }
+    }
+
+    /*
+                                        BaseTable
+            GlobalView1 with TTL(1 ms)            GlobalView2 with TTL(1 DAY)
+        Index1                 Index2            Index3                    Index4
+
+       Upserting data to both global views, and run the MR job.
+       It should only delete GlobalView1 data not remove GlobalView2 data.
+    */
+    @Test
+    public void testGlobalViewOnNonMultiTenantTableWithIndex() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String indexTable1 = generateUniqueName() + "_IDX";
+        String indexTable2 = generateUniqueName() + "_IDX";
+        String indexTable3 = generateUniqueName() + "_IDX";
+        String indexTable4 = generateUniqueName() + "_IDX";
+        String indexTable = "_IDX_" + baseTableFullName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String ddl = "CREATE TABLE " + baseTableFullName  +
+                    " (PK1 BIGINT NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                    "PK PRIMARY KEY (PK1,ID))";
+            globalConn.createStatement().execute(ddl);
+
+            ddl = "CREATE VIEW %s (PK2 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            ddl = "CREATE INDEX %s ON %s(%s)";
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable1, globalViewName1, "A,ID,B"));
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable2, globalViewName1, "C,ID,D"));
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable3, globalViewName2, "A,ID,B"));
+            globalConn.createStatement().execute(
+                    String.format(ddl, indexTable4, globalViewName2, "C,ID,D"));
+
+            globalConn.setAutoCommit(true);
+
+            String query = "UPSERT INTO %s (PK2,A,B,C,D,ID) VALUES(1,1,1,1,1,'%s')";
+            globalConn.createStatement().execute(
+                    String.format(query, globalViewName1, VIEW_PREFIX1));
+            globalConn.createStatement().execute(
+                    String.format(query, globalViewName2, VIEW_PREFIX2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 0);
+            verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
+        }
+    }
+
+    /*
+                                     BaseMultiTenantTable
+                                        GlobalView1
+                             TenantView1         TenantView2
+     */
+    @Test
+    public void testDeleteByViewAndTenant() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName2 = schema + "." + generateUniqueName();
+        String tenant1 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = 1";
+
+            globalConn.createStatement().execute(String.format(ddl, globalViewName1));
+
+            ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
+                    "WHERE ID = '%s' PHOENIX_TTL = %d";
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName2, globalViewName1, VIEW_PREFIX2,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+            tenant1Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", tenantViewName2, "-i", tenant1});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 0);
+        }
+    }
+
+    /*
+                                     BaseMultiTenantTable
+                               GlobalView1         GlobalView1
+                         TenantView1  TenantView2   TenantView1  TenantView2
+     */
+    @Test
+    public void testDeleteByTenant() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName3 = schema + "." + generateUniqueName();
+        String tenantViewName4 = schema + "." + generateUniqueName();
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d";
+
+            globalConn.createStatement().execute(String.format(ddl, globalViewName1, 1));
+            globalConn.createStatement().execute(String.format(ddl, globalViewName2, 2));
+
+            ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
+                    "WHERE ID = '%s' PHOENIX_TTL = %d";
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2,
+                            PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-i", tenant1});
+            assertEquals(0, status);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 0);
+        }
+    }
+
+    /*
+                                     BaseMultiTenantTable
+                               GlobalView1         GlobalView1
+                         TenantView1  TenantView2   TenantView1  TenantView2
+     */
+    @Test
+    public void testDeleteByViewName() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName1 = schema + "." + generateUniqueName();
+        String globalViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName1 = schema + "." + generateUniqueName();
+        String tenantViewName2 = schema + "." + generateUniqueName();
+        String tenantViewName3 = schema + "." + generateUniqueName();
+        String tenantViewName4 = schema + "." + generateUniqueName();
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection =
+                     PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, baseTableFullName);
+            String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(
+                    String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+
+            ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s WHERE ID = '%s'";
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1));
+            tenant1Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2));
+
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1));
+            tenant2Connection.createStatement().execute(
+                    String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2));
+
+            tenant1Connection.setAutoCommit(true);
+            tenant2Connection.setAutoCommit(true);
+
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
+            tenant1Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
+            tenant2Connection.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", globalViewName1});
+            assertEquals(0, status);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+        }
+    }
+
+
+    /*
+                                             BaseTable
+                                         GlobalView1 with TTL
+               MiddleLevelView1 with TTL(1 ms)    MiddleLevelView2 with TTL(1 DAY)
+                    LeafView1                             LeafView2
+       Upserting data to both leafView, and run the MR job.
+       It should only delete MiddleLevelView1 data not remove MiddleLevelView2 data.
+    */
+    @Test
+    public void testCleanMoreThanThreeLevelViewCase() throws Exception {
+        String schema = generateUniqueName();
+        String baseTableFullName = schema + "." + generateUniqueName();
+        String globalViewName = schema + "." + generateUniqueName();
+        String middleLevelViewName1 = schema + "." + generateUniqueName();
+        String middleLevelViewName2 = schema + "." + generateUniqueName();
+        String leafViewName1 = schema + "." + generateUniqueName();
+        String leafViewName2 = schema + "." + generateUniqueName();
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl())) {
+            String baseTableDdl = "CREATE TABLE " + baseTableFullName  +
+                    " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
+            globalConn.createStatement().execute(baseTableDdl);
+
+            String globalViewDdl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
+                    "A BIGINT, B BIGINT)" + " AS SELECT * FROM " + baseTableFullName;
+
+            globalConn.createStatement().execute(String.format(globalViewDdl, globalViewName));
+
+            String middleLevelViewDdl = "CREATE VIEW %s (C BIGINT, D BIGINT)" +
+                    " AS SELECT * FROM %s WHERE ID ='%s' PHOENIX_TTL = %d";
+
+            globalConn.createStatement().execute(String.format(middleLevelViewDdl,
+                    middleLevelViewName1, globalViewName,
+                    VIEW_PREFIX1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
+            globalConn.createStatement().execute(String.format(middleLevelViewDdl,
+                    middleLevelViewName2, globalViewName, VIEW_PREFIX2,
+                    PHOENIX_TTL_EXPIRE_IN_A_DAY));
+
+            String leafViewDdl = "CREATE VIEW %s (E BIGINT, F BIGINT)" +
+                    " AS SELECT * FROM %s";
+
+            globalConn.createStatement().execute(String.format(leafViewDdl,
+                    leafViewName1, middleLevelViewName1));
+            globalConn.createStatement().execute(String.format(leafViewDdl,
+                    leafViewName2, middleLevelViewName2));
+
+            globalConn.setAutoCommit(true);
+
+            globalConn.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName1));
+            globalConn.createStatement().execute(
+                    String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName2));
+
+            // wait the row to be expired and index to be updated
+            Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
+
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+
+            // running MR job to delete expired rows.
+            PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            phoenixTtlTool.setConf(conf);
+            int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
+            verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
+        }
+    }
+
+    @Test
+    public void testNoViewCase() throws Exception {
+        PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
+        Configuration conf = new Configuration(getUtility().getConfiguration());
+        phoenixTtlTool.setConf(conf);
+        int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
+        assertEquals(0, status);
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
new file mode 100644
index 0000000..9d7da2e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+    This is a generic MultiViewInputFormat class that using by the MR job. You can
+    provide your own split strategy and provider class to customize your own business needed by
+    overwrite and load class blow:
+        MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ
+        MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ
+ */
+public class PhoenixMultiViewInputFormat<T extends Writable> extends InputFormat<NullWritable,T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewInputFormat.class);
+
+    @Override public List<InputSplit> getSplits(JobContext context) throws IOException {
+        List<InputSplit> listOfInputSplit;
+        try {
+            final Configuration configuration = context.getConfiguration();
+            Class<?> defaultMultiInputStrategyClazz = DefaultPhoenixMultiViewListProvider.class;
+            if (configuration.get(
+                    PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ) != null) {
+                defaultMultiInputStrategyClazz = Class.forName(configuration.get(
+                        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ));
+            }
+            PhoenixMultiViewListProvider phoenixMultiViewListProvider =
+                    (PhoenixMultiViewListProvider) defaultMultiInputStrategyClazz.newInstance();
+            List<ViewInfoWritable> views =
+                    phoenixMultiViewListProvider.getPhoenixMultiViewList(configuration);
+
+            Class<?> defaultDeletionMultiInputSplitStrategyClazz =
+                    DefaultMultiViewSplitStrategy.class;
+            if (configuration.get(
+                    PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ) != null) {
+                defaultDeletionMultiInputSplitStrategyClazz = Class.forName(configuration.get(
+                        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ));
+            }
+            MultiViewSplitStrategy multiViewSplitStrategy = (MultiViewSplitStrategy)
+                    defaultDeletionMultiInputSplitStrategyClazz.newInstance();
+            listOfInputSplit = multiViewSplitStrategy.generateSplits(views, configuration);
+        } catch (ClassNotFoundException e) {
+            LOGGER.debug("PhoenixMultiViewInputFormat is getting ClassNotFoundException : " +
+                    e.getMessage());
+            throw new IOException(
+                    "PhoenixMultiViewInputFormat is getting ClassNotFoundException : " +
+                            e.getMessage(), e.getCause());
+        } catch (InstantiationException e) {
+            LOGGER.debug("PhoenixMultiViewInputFormat is getting InstantiationException : " +
+                    e.getMessage());
+            throw new IOException(
+                    "PhoenixMultiViewInputFormat is getting InstantiationException : " +
+                            e.getMessage(), e.getCause());
+        } catch (IllegalAccessException e) {
+            LOGGER.debug("PhoenixMultiViewInputFormat is getting IllegalAccessException : " +
+                    e.getMessage());
+            throw new IOException(
+                    "PhoenixMultiViewInputFormat is getting IllegalAccessException : " +
+                            e.getMessage(), e.getCause());
+        }
+
+        return listOfInputSplit == null ? new ArrayList<InputSplit>() : listOfInputSplit;
+    }
+
+    @Override
+    public RecordReader<NullWritable,T> createRecordReader(InputSplit split,
+                                                           TaskAttemptContext context) {
+        final Configuration configuration = context.getConfiguration();
+
+        final Class<T> inputClass =
+                (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+        return getPhoenixRecordReader(inputClass, configuration);
+    }
+
+    private RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+                                                                Configuration configuration) {
+        return new PhoenixMultiViewReader<>(inputClass , configuration);
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
new file mode 100644
index 0000000..37f4218
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+    Generic class that provide a list of views for the MR job. You can overwrite your own logic to
+    filter/add views.
+ */
+public class PhoenixMultiViewInputSplit extends InputSplit implements Writable {
+
+    List<ViewInfoWritable> viewInfoTrackerList;
+
+    public PhoenixMultiViewInputSplit() {
+        this.viewInfoTrackerList = new ArrayList<>();
+    }
+
+    public PhoenixMultiViewInputSplit(List<ViewInfoWritable> viewInfoTracker) {
+        this.viewInfoTrackerList = viewInfoTracker;
+    }
+
+    @Override public void write(DataOutput output) throws IOException {
+        WritableUtils.writeVInt(output, this.viewInfoTrackerList.size());
+        for (ViewInfoWritable viewInfoWritable : this.viewInfoTrackerList) {
+            if (viewInfoWritable instanceof ViewInfoTracker) {
+                viewInfoWritable.write(output);
+            }
+        }
+    }
+
+    @Override public void readFields(DataInput input) throws IOException {
+        int count = WritableUtils.readVInt(input);
+        for (int i = 0; i < count; i++) {
+            ViewInfoTracker viewInfoTracker = new ViewInfoTracker();
+            viewInfoTracker.readFields(input);
+            this.viewInfoTrackerList.add(viewInfoTracker);
+        }
+    }
+
+    @Override public long getLength() {
+        return 0;
+    }
+
+    @Override public String[] getLocations() {
+        return new String[0];
+    }
+
+    public List<ViewInfoWritable> getViewInfoTrackerList() {
+        return this.viewInfoTrackerList;
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java
new file mode 100644
index 0000000..f1d7662
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PhoenixMultiViewReader<T extends Writable> extends RecordReader<NullWritable,T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewReader.class);
+
+    private Configuration  configuration;
+    private Class<T> inputClass;
+    Iterator<ViewInfoWritable> it;
+
+    public PhoenixMultiViewReader() {
+
+    }
+
+    public PhoenixMultiViewReader(final Class<T> inputClass, final Configuration configuration) {
+        this.configuration = configuration;
+        this.inputClass = inputClass;
+    }
+
+    @Override public void initialize(InputSplit split, TaskAttemptContext context)
+            throws IOException {
+        if (split instanceof PhoenixMultiViewInputSplit) {
+            final PhoenixMultiViewInputSplit pSplit = (PhoenixMultiViewInputSplit)split;
+            final List<ViewInfoWritable> viewInfoTracker = pSplit.getViewInfoTrackerList();
+            it = viewInfoTracker.iterator();
+        } else {
+            LOGGER.error("InputSplit class cannot cast to PhoenixMultiViewInputSplit.");
+            throw new IOException("InputSplit class cannot cast to PhoenixMultiViewInputSplit");
+        }
+    }
+
+    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+        return it.hasNext();
+    }
+
+    @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return null;
+    }
+
+    @Override public T getCurrentValue() throws IOException, InterruptedException {
+        ViewInfoWritable currentValue = null;
+        if (it.hasNext()) {
+            currentValue = it.next();
+        }
+        return (T)currentValue;
+    }
+
+    @Override public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override public void close() throws IOException {
+
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
new file mode 100644
index 0000000..3e104fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class PhoenixTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker,
+        NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(
+                    PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " + config.get(
+                        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(config.get(
+                        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker)
+                    defaultViewDeletionTrackerClass.newInstance();
+        } catch (Exception e) {
+            LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker " +
+                    "with error message " + e.getMessage());
+            throw e;
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, ViewInfoTracker value, Context context)
+            throws IOException {
+        try {
+            final Configuration config = context.getConfiguration();
+
+            if (this.multiViewJobStatusTracker == null) {
+                initMultiViewJobStatusTracker(config);
+            }
+
+            LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+                    value.getViewName(), value.getTenantId(), value.getPhoenixTtl()));
+
+            deleteExpiredRows(value, config, context);
+
+        } catch (SQLException e) {
+            LOGGER.error("Mapper got an exception while deleting expired rows : "
+                    + e.getMessage() );
+            throw new IOException(e.getMessage(), e.getCause());
+        } catch (Exception e) {
+            LOGGER.error("Getting IOException while running View TTL Deletion Job mapper " +
+                    "with error : " + e.getMessage());
+            throw new IOException(e.getMessage(), e.getCause());
+        }
+    }
+
+    private void deleteExpiredRows(ViewInfoTracker value, Configuration config, Context context)
+            throws Exception {
+        try (PhoenixConnection connection =
+                     (PhoenixConnection) ConnectionUtil.getInputConnection(config)) {
+            if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) {
+                Properties props = new Properties();
+                props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, value.getTenantId());
+
+                try (PhoenixConnection tenantConnection = (PhoenixConnection)
+                        DriverManager.getConnection(connection.getURL(), props)) {
+                    deleteExpiredRows(tenantConnection, value, config, context);
+                }
+            } else {
+                deleteExpiredRows(connection, value, config, context);
+            }
+        }
+    }
+
+    /*
+     * Each Mapper that receives a MultiPhoenixViewInputSplit will execute a DeleteMutation/Scan
+     * (With DELETE_TTL_EXPIRED attribute) per view for all the views and view indexes in the split.
+     * For each DeleteMutation, it bounded by the view start and stop keys for the region and
+     *  TTL attributes and Delete Hint.
+     */
+    private void deleteExpiredRows(PhoenixConnection connection, ViewInfoTracker viewInfoTracker,
+                                   Configuration config, Context context) throws Exception {
+        try (PhoenixStatement pstmt =
+                     new PhoenixStatement(connection).unwrap(PhoenixStatement.class)) {
+            PTable pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getViewName());
+            String deleteIfExpiredStatement = "SELECT /*+ NO_INDEX */ count(*) FROM " +
+                    viewInfoTracker.getViewName();
+
+            if (viewInfoTracker.isIndexRelation()) {
+                pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getRelationName());
+                deleteIfExpiredStatement = "SELECT count(*) FROM " +
+                        viewInfoTracker.getRelationName();
+            }
+
+            String sourceTableName = pTable.getTableName().getString();
+            this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                    ViewInfoJobState.INITIALIZED.getValue(), config, 0, context.getJobName());
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(pTable);
+            byte[] emptyColumnName = pTable.getEncodingScheme() ==
+                    PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            pTable.getEncodingScheme().encode(
+                                    QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(
+                    BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(
+                    BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(
+                    BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(
+                    BaseScannerRegionObserver.PHOENIX_TTL,
+                    Bytes.toBytes(viewInfoTracker.getPhoenixTtl()));
+            scan.setAttribute(
+                    BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME,
+                    Bytes.toBytes(sourceTableName));
+
+            this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                    ViewInfoJobState.RUNNING.getValue(), config, 0, context.getJobName());
+
+            addingDeletionMarkWithRetries(pstmt, viewInfoTracker, config, context,
+                    queryPlan);
+        } catch (Exception e) {
+            if (e instanceof SQLException && ((SQLException) e).getErrorCode() ==
+                    SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) {
+                this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                        ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName());
+            }
+            LOGGER.error(String.format("Had an issue to process the view: %s, " +
+                    "see error %s ", viewInfoTracker.toString(),e.getMessage()));
+        }
+    }
+
+    private boolean addingDeletionMarkWithRetries(PhoenixStatement stmt,
+                                                  ViewInfoTracker viewInfoTracker,
+                                                  Configuration config, Context context,
+                                                  QueryPlan queryPlan)
+            throws Exception {
+        int retry = 0;
+        long startTime = System.currentTimeMillis();
+        String viewInfo = viewInfoTracker.getTenantId() == null ?
+                viewInfoTracker.getViewName() : viewInfoTracker.getTenantId()
+                + "." + viewInfoTracker.getViewName();
+
+        while (retry < DEFAULT_MAX_RETRIES) {
+            try {
+                PhoenixResultSet rs = stmt.newResultSet(
+                        queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+
+                long numberOfDeletedRows = 0;
+                if (rs.next()) {
+                    numberOfDeletedRows = rs.getLong(1);
+                }
+                this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, numberOfDeletedRows,
+                        ViewInfoJobState.SUCCEEDED.getValue(), config,
+                        System.currentTimeMillis() - startTime, context.getJobName());
+                PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus =
+                        viewInfoTracker.isIndexRelation() ?
+                        PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_SUCCEED :
+                        PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_SUCCEED;
+                context.getCounter(metricsStatus).increment(1);
+                return true;
+            } catch (Exception e) {
+                PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus =
+                        viewInfoTracker.isIndexRelation() ?
+                        PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_FAILED :
+                        PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_FAILED;
+                if (e instanceof SQLException && ((SQLException) e).getErrorCode() ==
+                        SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) {
+                    LOGGER.info(viewInfo + " has been deleted : " + e.getMessage());
+                    this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                            ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName());
+                    context.getCounter(metricsStatus).increment(1);
+                    return false;
+                }
+                retry++;
+
+                if (retry == DEFAULT_MAX_RETRIES) {
+                    LOGGER.error("Deleting " + viewInfo + " expired rows has an exception for : "
+                            + e.getMessage());
+                    this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                            ViewInfoJobState.FAILED.getValue(), config, 0, context.getJobName());
+                    context.getCounter(metricsStatus).increment(1);
+                    throw e;
+                } else {
+                    Thread.sleep(DEFAULT_RETRY_SLEEP_TIME_IN_MS);
+                }
+            }
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java
new file mode 100644
index 0000000..153e7be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class PhoenixTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        VIEW_FAILED,
+        VIEW_SUCCEED,
+        VIEW_INDEX_FAILED,
+        VIEW_INDEX_SUCCEED
+    }
+
+    public static final String DELETE_ALL_VIEWS = "DELETE_ALL_VIEWS";
+    public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+    public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+    private static final Option DELETE_ALL_VIEWS_OPTION = new Option("a", "all", false,
+            "Delete all views from all tables.");
+    private static final Option VIEW_NAME_OPTION = new Option("v", "view", true,
+            "Delete Phoenix View Name");
+    private static final Option TENANT_ID_OPTION = new Option("i", "id", true,
+            "Delete an view based on the tenant id.");
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4");
+    private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true,
+            "Define split size for each mapper.");
+    private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true,
+            "Define batch size for fetching views metadata from syscat.");
+    private static final Option RUN_FOREGROUND_OPTION = new Option("runfg",
+            "run-foreground", false, "If specified, runs PhoenixTTLTool " +
+            "in Foreground. Default - Runs the build in background");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+    private Configuration configuration;
+    private Connection connection;
+    private String viewName;
+    private String tenantId;
+    private String jobName;
+    private boolean isDeletingAllViews;
+    private JobPriority jobPriority;
+    private boolean isForeground;
+    private int splitSize;
+    private int batchSize;
+    private Job job;
+
+    public void parseArgs(String[] args) {
+        CommandLine cmdLine;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            throw e;
+        }
+
+        if (getConf() == null) {
+            setConf(HBaseConfiguration.create());
+        }
+
+        if (cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt())) {
+            this.isDeletingAllViews = true;
+        } else if (cmdLine.hasOption(VIEW_NAME_OPTION.getOpt())) {
+            viewName = cmdLine.getOptionValue(VIEW_NAME_OPTION.getOpt());
+            this.isDeletingAllViews = false;
+        }
+
+        if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            tenantId = cmdLine.getOptionValue((TENANT_ID_OPTION.getOpt()));
+        }
+
+        if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
+            splitSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            splitSize = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        if (cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())) {
+            batchSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            batchSize = DEFAULT_QUERY_BATCH_SIZE;
+        }
+
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    }
+
+    public String getJobPriority() {
+        return this.jobPriority.toString();
+    }
+
+    private JobPriority getJobPriority(CommandLine cmdLine) {
+        String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+        if (jobPriorityOption == null) {
+            return JobPriority.NORMAL;
+        }
+
+        switch (jobPriorityOption) {
+            case "0" : return JobPriority.VERY_HIGH;
+            case "1" : return JobPriority.HIGH;
+            case "2" : return JobPriority.NORMAL;
+            case "3" : return JobPriority.LOW;
+            case "4" : return JobPriority.VERY_LOW;
+            default:
+                return JobPriority.NORMAL;
+        }
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public boolean isDeletingAllViews() {
+        return this.isDeletingAllViews;
+    }
+
+    public String getTenantId() {
+        return this.tenantId;
+    }
+
+    public String getViewName() {
+        return this.viewName;
+    }
+
+    public int getSplitSize() {
+        return this.splitSize;
+    }
+
+    public int getBatchSize() {
+        return this.batchSize;
+    }
+
+    public CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(),
+                    options);
+        }
+
+        if (!cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt()) &&
+                !cmdLine.hasOption(VIEW_NAME_OPTION.getOpt()) &&
+                !cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            throw new IllegalStateException("No deletion job is specified, " +
+                    "please indicate deletion job for ALL/VIEW/TENANT level");
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        this.jobPriority = getJobPriority(cmdLine);
+
+        return cmdLine;
+    }
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(DELETE_ALL_VIEWS_OPTION);
+        options.addOption(VIEW_NAME_OPTION);
+        options.addOption(TENANT_ID_OPTION);
+        options.addOption(HELP_OPTION);
+        options.addOption(JOB_PRIORITY_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
+        options.addOption(SPLIT_SIZE_OPTION);
+        options.addOption(BATCH_SIZE_OPTION);
+
+        return options;
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        LOGGER.error(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobName() {
+        if (this.jobName == null) {
+            String jobName;
+            if (this.isDeletingAllViews) {
+                jobName = DELETE_ALL_VIEWS;
+            } else if (this.getViewName() != null) {
+                jobName = this.getViewName();
+            } else  {
+                jobName = this.tenantId;
+            }
+            this.jobName =  "PhoenixTTLTool-" + jobName + "-";
+        }
+
+        return this.jobName;
+    }
+
+    public void setPhoenixTTLJobInputConfig(Configuration configuration) {
+        if (this.isDeletingAllViews) {
+            configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS,
+                    DELETE_ALL_VIEWS);
+        } else if (this.getViewName() != null) {
+            configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW,
+                    this.viewName);
+        }
+
+        if (this.tenantId != null) {
+            configuration.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, this.tenantId);
+        }
+    }
+
+    public void configureJob() throws Exception {
+        this.job = Job.getInstance(getConf(),getJobName());
+        PhoenixMapReduceUtil.setInput(job, this);
+
+        job.setJarByClass(PhoenixTTLTool.class);
+        job.setMapperClass(PhoenixTTLDeleteJobMapper.class);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(NullWritable.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setNumReduceTasks(0);
+        job.setPriority(this.jobPriority);
+
+        TableMapReduceUtil.addDependencyJars(job);
+        LOGGER.info("PhoenixTTLTool is running for " + job.getJobName());
+    }
+
+    public int runJob() {
+        try {
+            if (isForeground) {
+                LOGGER.info("Running PhoenixTTLTool in foreground. " +
+                        "Runs full table scans. This may take a long time!");
+                return (job.waitForCompletion(true)) ? 0 : 1;
+            } else {
+                LOGGER.info("Running PhoenixTTLTool in Background - Submit async and exit");
+                job.submit();
+                return 0;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Caught exception " + e + " trying to run PhoenixTTLTool.");
+            return 1;
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        connection = null;
+        int ret;
+        try {
+            parseArgs(args);
+            configuration = HBaseConfiguration.addHbaseResources(getConf());
+            connection = ConnectionUtil.getInputConnection(configuration, new Properties());
+            configureJob();
+            TableMapReduceUtil.initCredentials(job);
+            ret = runJob();
+        } catch (Exception e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            return -1;
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+        return ret;
+    }
+
+    public static void main(final String[] args) throws Exception {
+        int result = ToolRunner.run(new PhoenixTTLTool(), args);
+        System.exit(result);
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java
new file mode 100644
index 0000000..6e2f4f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultMultiViewJobStatusTracker implements MultiViewJobStatusTracker {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultMultiViewJobStatusTracker.class);
+
+    public void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state,
+                                Configuration config, long duration, String mrJobName) {
+        if (state == ViewInfoJobState.SUCCEEDED.getValue()) {
+            LOGGER.debug(String.format("Number of deleted rows from view %s, TenantID %s, " +
+                            "and Source Table Name %s : " +
+                            "number of deleted row %d, duration : %d, mr job name : %s.",
+                    view.getViewName(), view.getTenantId(), view.getRelationName(),
+                    numberOfDeletedRows, duration, mrJobName));
+        } else if (state == ViewInfoJobState.DELETED.getValue()) {
+            LOGGER.debug(String.format("View has been deleted, view info : view %s, TenantID %s, " +
+                    "and Source Table Name %s : %d," +
+                    " mr job name : %s.", view.getViewName(), view.getTenantId(),
+                    view.getRelationName(), mrJobName));
+        } else {
+            LOGGER.debug(String.format("Job is in state %d for view %s, TenantID %s, " +
+                    "Source Table Name %s , and duration : %d, " +
+                    "mr job name : %s.", state, view.getViewName(), view.getTenantId(),
+                    view.getRelationName(), duration, mrJobName));
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
new file mode 100644
index 0000000..00eee09
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+    public List<InputSplit> generateSplits(List<ViewInfoWritable> views,
+                                           Configuration configuration) {
+        int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+
+        if (numViewsInSplit < 1) {
+            numViewsInSplit = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        int numberOfMappers = getNumberOfMappers(views.size(),numViewsInSplit);
+
+        final List<InputSplit> pSplits = Lists.newArrayListWithExpectedSize(numberOfMappers);
+        // Split the views into splits
+
+        for (int i = 0; i < numberOfMappers; i++) {
+            pSplits.add(new PhoenixMultiViewInputSplit(views.subList(
+                    i * numViewsInSplit, getUpperBound(numViewsInSplit, i, views.size()))));
+        }
+
+        return pSplits;
+    }
+
+    /*
+        Calculate number of mappers are needed based on split policy and
+        number of views on the cluster
+     */
+    public int getNumberOfMappers(int viewSize, int numViewsInSplit) {
+        int numberOfMappers = viewSize / numViewsInSplit;
+        if (Math.ceil(viewSize % numViewsInSplit) > 0) {
+            numberOfMappers++;
+        }
+        return numberOfMappers;
+    }
+
+    /*
+        Calculate the upper bound for each mapper. For example, given
+        split policy is 10 cleanup jobs per mapper, and the total view size at the cluster
+        is 12.
+        The first mapper will take from [0 - 10), this method will return 10 as upper bound
+        The second mapper will take from [10 - 12), this method will return 12 as upper bound.
+     */
+    public int getUpperBound(int numViewsInSplit, int i, int viewSize) {
+        int upper = (i + 1) * numViewsInSplit;
+        if (viewSize < upper) {
+            upper = viewSize;
+        }
+
+        return upper;
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
new file mode 100644
index 0000000..a9bd79e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        boolean isFetchAll = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+
+        if (!isFetchAll) {
+            return getTenantOrViewMultiViewList(configuration);
+        }
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+        boolean isQueryMore = true;
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+
+        String schema = null;
+        String tableName = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW);
+        if (tableName != null) {
+            schema = SchemaUtil.getSchemaNameFromFullName(tableName);
+        }
+        String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)){
+            try (PreparedStatement stmt = connection.prepareStatement(query)) {
+                do {
+                    stmt.setString(1, tenantId);
+                    stmt.setString(2, schema);
+                    stmt.setString(3, tableName);
+                    stmt.setInt(4, limit);
+
+                    ResultSet viewRs = stmt.executeQuery();
+                    String fullTableName = null;
+
+                    while (viewRs.next()) {
+                        tenantId = viewRs.getString(1);
+                        schema = viewRs.getString(2);
+                        tableName = viewRs.getString(3);
+                        fullTableName = tableName;
+                        Long viewTtlValue = viewRs.getLong(4);
+
+                        if (schema != null && schema.length() > 0) {
+                            fullTableName = SchemaUtil.getTableName(schema, tableName);
+                        }
+
+                        if (!isParentHasTTL(connection, tenantId, fullTableName)) {
+                            addingViewIndexToTheFinalList(connection,tenantId,fullTableName,
+                                    viewTtlValue, viewInfoWritables);
+                        }
+                    }
+                    if (isQueryMore) {
+                        if (fullTableName == null) {
+                            isQueryMore = false;
+                        }
+                    }
+                } while (isQueryMore);
+            }
+
+        }  catch (Exception e) {
+            LOGGER.error("Getting view info failed with: " + e.getMessage());
+        }
+        return viewInfoWritables;
+    }
+
+    public List<ViewInfoWritable> getTenantOrViewMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)) {
+            try (Statement stmt = connection.createStatement()) {
+                ResultSet viewRs = stmt.executeQuery(query);
+                while (viewRs.next()) {
+                    String tenantId = viewRs.getString(1);
+                    String schema = viewRs.getString(2);
+                    String tableName = viewRs.getString(3);
+                    Long viewTtlValue = viewRs.getLong(4);
+                    String fullTableName = tableName;
+
+                    if (schema != null && schema.length() > 0) {
+                        fullTableName = SchemaUtil.getTableName(schema, tableName);
+                    }
+
+                    if (!isParentHasTTL(connection, tenantId, fullTableName)) {
+                        addingViewIndexToTheFinalList(connection,tenantId,fullTableName,
+                                viewTtlValue, viewInfoWritables);
+                    }
+                }
+            }
+        }catch (Exception e) {
+            LOGGER.error("Getting view info failed with: " + e.getMessage());
+        }
+        return viewInfoWritables;
+    }
+
+    private boolean isParentHasTTL(PhoenixConnection connection,
+                                   String tenantId, String fullTableName) {
+        boolean skip= false;
+        try {
+            PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+            PTable parentTable = PhoenixRuntime.getTable(connection, null,
+                    pTable.getParentName().toString());
+            if (parentTable.getType() == PTableType.VIEW &&
+                    parentTable.getPhoenixTTL() > 0) {
+                            /* if the current view parent already has a TTL value, we want to
+                            skip the current view cleanup job because we want to run the cleanup
+                             job for at the GlobalView level instead of running multi-jobs at
+                             the LeafView level for the better performance.
+
+                                     BaseTable
+                               GlobalView(has TTL)
+                            LeafView1, LeafView2, LeafView3....
+                            */
+                skip = true;
+            }
+        } catch (Exception e) {
+            skip = true;
+            LOGGER.error(String.format("Had an issue to process the view: %s, " +
+                            "tenantId: see error %s ", fullTableName, tenantId,
+                    e.getMessage()));
+        }
+        return skip;
+    }
+
+    private void addingViewIndexToTheFinalList(PhoenixConnection connection, String tenantId,
+                                               String fullTableName, long viewTtlValue,
+                                               List<ViewInfoWritable> viewInfoWritables)
+            throws Exception {
+        PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+        ViewInfoWritable viewInfoTracker = new ViewInfoTracker(
+                tenantId,
+                fullTableName,
+                viewTtlValue,
+                pTable.getPhysicalName().getString(),
+                false
+
+        );
+        viewInfoWritables.add(viewInfoTracker);
+
+        List<PTable> allIndexesOnView = pTable.getIndexes();
+        for (PTable viewIndexTable : allIndexesOnView) {
+            String indexName = viewIndexTable.getTableName().getString();
+            String indexSchema = viewIndexTable.getSchemaName().getString();
+            if (indexName.contains(
+                    QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+                indexName = SchemaUtil.getTableNameFromFullName(indexName,
+                        QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+            }
+            indexName = SchemaUtil.getTableNameFromFullName(indexName);
+            indexName = SchemaUtil.getTableName(indexSchema, indexName);
+            ViewInfoWritable viewInfoTrackerForIndexEntry = new ViewInfoTracker(
+                    tenantId,
+                    fullTableName,
+                    viewTtlValue,
+                    indexName,
+                    true
+
+            );
+            viewInfoWritables.add(viewInfoTrackerForIndexEntry);
+        }
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java
new file mode 100644
index 0000000..7520dbe
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface MultiViewJobStatusTracker {
+    void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state,
+                         Configuration config, long duration, String mrJobName);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java
new file mode 100644
index 0000000..d438005
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.util.List;
+
+public interface MultiViewSplitStrategy {
+    List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 82c1f2f..792b621 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -166,6 +166,27 @@ public final class PhoenixConfigurationUtil {
 
     public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
 
+    // group number of views per mapper to run the deletion job
+    public static final String MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE = "phoenix.mapreduce.multi.input.split.size";
+
+    public static final String MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE = "phoenix.mapreduce.multi.input.batch.size";
+
+    // phoenix ttl data deletion job for a specific view
+    public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW = "phoenix.mapreduce.phoenix_ttl.per_view";
+
+    // phoenix ttl data deletion job for all views.
+    public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS = "phoenix.mapreduce.phoenix_ttl.all";
+
+    // provide an absolute path to inject your multi input logic
+    public static final String MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.input.strategy.path";
+
+    // provide an absolute path to inject your multi split logic
+    public static final String MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.split.strategy.path";
+
+    // provide an absolute path to inject your multi input mapper logic
+    public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = "phoenix.mapreduce.multi.mapper.tracker.path";
+
+
     /**
      * Determines type of Phoenix Map Reduce job.
      * 1. QUERY allows running arbitrary queries without aggregates
@@ -408,12 +429,28 @@ public final class PhoenixConfigurationUtil {
         
     }
 
-     public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException {
-         Preconditions.checkNotNull(configuration);
-         Preconditions.checkNotNull(upsertStmt);
-         configuration.set(UPSERT_STATEMENT, upsertStmt);
-     }
-    
+    public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(upsertStmt);
+        configuration.set(UPSERT_STATEMENT, upsertStmt);
+    }
+
+    public static void setMultiInputMapperSplitSize(Configuration configuration, final int splitSize) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitSize));
+    }
+
+    public static void setMultiViewQueryMoreSplitSize(Configuration configuration, final int batchSize) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE, String.valueOf(batchSize));
+    }
+
+    public static int getMultiViewQueryMoreSplitSize(final Configuration configuration) {
+        final String batchSize = configuration.get(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE);
+        Preconditions.checkNotNull(batchSize);
+        return Integer.valueOf(batchSize);
+    }
+
     public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
         Preconditions.checkNotNull(configuration);
         List<ColumnInfo> columnMetadataList = null;
@@ -438,6 +475,12 @@ public final class PhoenixConfigurationUtil {
         return columnMetadataList;
     }
 
+    public static int getMultiViewSplitSize(final Configuration configuration) {
+        final String splitSize = configuration.get(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE);
+        Preconditions.checkNotNull(splitSize);
+        return Integer.valueOf(splitSize);
+    }
+
     private static List<String> getSelectColumnList(
             final Configuration configuration) {
     	List<String> selectColumnList = PhoenixConfigurationUtil.getSelectColumnNames(configuration);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 6c23fd9..cab2361 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputFormat;
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 
 import java.io.IOException;
@@ -127,6 +129,21 @@ public final class PhoenixMapReduceUtil {
 
     /**
      *
+     * @param job MR job instance
+     * @param tool PhoenixTtlTool for Phoenix TTL deletion MR job
+     */
+    public static void setInput(final Job job, PhoenixTTLTool tool) {
+        Configuration configuration = job.getConfiguration();
+        job.setInputFormatClass(PhoenixMultiViewInputFormat.class);
+        tool.setPhoenixTTLJobInputConfig(configuration);
+        PhoenixConfigurationUtil.setSchemaType(configuration,
+                PhoenixConfigurationUtil.SchemaType.QUERY);
+        PhoenixConfigurationUtil.setMultiInputMapperSplitSize(configuration, tool.getSplitSize());
+        PhoenixConfigurationUtil.setMultiViewQueryMoreSplitSize(configuration, tool.getBatchSize());
+    }
+
+    /**
+     *
      * @param job
      * @param inputClass DBWritable class
      * @param snapshotName The name of a snapshot (of a table) to read from
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
new file mode 100644
index 0000000..d25e0e3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+
+public class PhoenixMultiInputUtil {
+    public static final String SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY =
+            "SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHOENIX_TTL FROM " +
+                    SYSTEM_CATALOG_NAME + " WHERE " +
+                    TABLE_TYPE + " = '" + PTableType.VIEW.getSerializedValue() + "' AND " +
+                    PHOENIX_TTL + " IS NOT NULL AND " +
+                    PHOENIX_TTL + " > " + PHOENIX_TTL_NOT_DEFINED + " AND " +
+                    VIEW_TYPE + " <> " + PTable.ViewType.MAPPED.getSerializedValue();
+
+    public static Connection buildTenantConnection(String url, String tenantId)
+            throws SQLException {
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(url, props);
+    }
+
+    public static String getSelectAllPageQuery() {
+        return SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY + " AND " +
+                "(TENANT_ID,TABLE_SCHEM,TABLE_NAME) > (?,?,?) LIMIT ?";
+    }
+
+    public static String constructViewMetadataQueryBasedOnView(String fullName, String tenantId) {
+        String query = SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY;
+
+
+        if (fullName != null) {
+            if (fullName.equals(PhoenixTTLTool.DELETE_ALL_VIEWS)) {
+                return query;
+            }
+
+            String schema = SchemaUtil.getSchemaNameFromFullName(fullName);
+            String viewName = SchemaUtil.getTableNameFromFullName(fullName);
+
+            if (!schema.equals(StringUtil.EMPTY_STRING)) {
+                query = query + " AND TABLE_SCHEM = '" + schema + "'";
+            } else {
+                query = query + " AND TABLE_SCHEM IS NULL";
+            }
+
+            query = query + " AND TABLE_NAME = '" + viewName + "'";
+        }
+
+        if (tenantId != null && tenantId.length() > 0) {
+            query = query + " AND TENANT_ID = '" + tenantId + "'";
+        } else {
+            query = query + " AND TENANT_ID IS NULL";
+        }
+
+        return query;
+    }
+
+
+    public static String constructViewMetadataQueryBasedOnTenant(String tenant) {
+        return constructViewMetadataQueryBasedOnView(null, tenant);
+    }
+
+    public static String getFetchViewQuery(Configuration configuration) {
+        String query;
+        if (configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null) {
+            query = PhoenixMultiInputUtil.getSelectAllPageQuery();
+        } else if (configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID) != null &&
+                configuration.get(PhoenixConfigurationUtil.
+                        MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW) == null) {
+            query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnTenant(
+                    configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID));
+        } else {
+            query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnView(
+                    configuration.get(
+                            PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW),
+                    configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID));
+        }
+        return query;
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java
new file mode 100644
index 0000000..154f47f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import java.util.List;
+
+public interface PhoenixMultiViewListProvider {
+    List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration);
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
new file mode 100644
index 0000000..a4f3226
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ViewInfoTracker implements ViewInfoWritable {
+
+    String tenantId;
+    String viewName;
+    String relationName;
+    long phoenixTtl;
+    boolean isIndexRelation;
+
+    public ViewInfoTracker() {
+
+    }
+
+    public ViewInfoTracker(String tenantId, String viewName, long phoenixTtl,
+                           String relationName, boolean isIndexRelation) {
+        setTenantId(tenantId);
+        this.viewName = viewName;
+        this.phoenixTtl = phoenixTtl;
+        this.relationName = relationName;
+        this.isIndexRelation = isIndexRelation;
+    }
+
+    private void setTenantId(String tenantId) {
+        if (tenantId != null) {
+            this.tenantId = tenantId;
+        }
+    }
+
+    @Override
+    public String getTenantId() {
+        return tenantId;
+    }
+
+    @Override
+    public String getViewName() {
+        return viewName;
+    }
+
+    @Override
+    public String getRelationName() {
+        return relationName;
+    }
+
+    @Override
+    public boolean isIndexRelation() {
+        return this.isIndexRelation;
+    }
+
+    public long getPhoenixTtl() {
+        return phoenixTtl;
+    }
+
+    @Override public void write(DataOutput output) throws IOException {
+        WritableUtils.writeString(output, tenantId);
+        WritableUtils.writeString(output, viewName);
+        WritableUtils.writeVLong(output, phoenixTtl);
+        WritableUtils.writeString(output, relationName);
+        WritableUtils.writeString(output, isIndexRelation ? "true" : "false");
+    }
+
+    @Override public void readFields(DataInput input) throws IOException {
+        setTenantId(WritableUtils.readString(input));
+        viewName = WritableUtils.readString(input);
+        phoenixTtl = WritableUtils.readVLong(input);
+        relationName = WritableUtils.readString(input);
+        isIndexRelation = WritableUtils.readString(input).equals("true");
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ViewName" + this.viewName);
+        if (this.tenantId != null) {
+            sb.append(", Tenant:" + this.tenantId);
+        }
+        if (this.isIndexRelation) {
+            sb.append(", IndexName:" + this.relationName);
+        } else {
+            sb.append(", BaseTableName:" + this.relationName);
+        }
+
+        return sb.toString();
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java
new file mode 100644
index 0000000..48a08e2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface ViewInfoWritable extends Writable {
+    public enum ViewInfoJobState {
+        INITIALIZED(1),
+        RUNNING(2),
+        SUCCEEDED(3),
+        FAILED(4),
+        KILLED(5),
+        DELETED(6);
+
+        int value;
+
+        ViewInfoJobState(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return this.value;
+        }
+    }
+
+    void write(DataOutput output) throws IOException;
+    void readFields(DataInput input) throws IOException;
+    String getTenantId();
+    String getViewName();
+    String getRelationName(); // from index or data table
+    boolean isIndexRelation();
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 893f5d8..dab3a96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -336,7 +336,6 @@ public interface QueryServices extends SQLCloseable {
     // Flag indicating that server side masking of ttl expired rows is enabled.
     public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
 
-
     // Before 4.15 when we created a view we included the parent table column metadata in the view
     // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent
     // table column metadata along with the child view metadata. When we resolve a child view, we
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java
new file mode 100644
index 0000000..3db85de
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.query.BaseTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE;
+import static org.junit.Assert.assertEquals;
+
+public class DefaultMultiViewSplitStrategyTest extends BaseTest {
+    DefaultMultiViewSplitStrategy defaultMultiViewSplitStrategy =
+            new DefaultMultiViewSplitStrategy();
+    @Test
+    public void testGetUpperBound() {
+        // given split policy to be 10 with view size 12
+        // we expect 2 mappers with range [0,10) and [10,12)
+        assertEquals(10,
+                defaultMultiViewSplitStrategy.getUpperBound(10, 0, 12));
+        assertEquals(12,
+                defaultMultiViewSplitStrategy.getUpperBound(10, 1, 12));
+
+        // given split policy to be 8 with view size 12
+        // we expect 2 mappers with range [0,8) and [8,12)
+        assertEquals(8,
+                defaultMultiViewSplitStrategy.getUpperBound(8, 0, 12));
+        assertEquals(12,
+                defaultMultiViewSplitStrategy.getUpperBound(8, 1, 12));
+
+        // given split policy to be 5 with view size 12
+        // we expect 1 mappers with range [0,1)
+        assertEquals(1,
+                defaultMultiViewSplitStrategy.getUpperBound(5, 0, 1));
+    }
+
+    @Test
+    public void testGetNumberOfMappers() {
+        int viewSize = 0;
+        int numViewsInSplit = 10;
+
+        // test empty cluster, which is view size is 0
+        assertEquals(0,
+                defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+        viewSize = 9;
+        // test viewSize is less than numViewsInSplit
+        assertEquals(1,
+                defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+        // test viewSize is equal to numViewsInSplit
+        viewSize = 10;
+        assertEquals(1,
+                defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+
+        // test viewSize is greater than numViewsInSplit
+        viewSize = 11;
+        assertEquals(2,
+                defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit));
+    }
+
+    @Test
+    public void testGenerateSplits() {
+        // test number of views greater than split policy
+        testGenerateSplits(11, 10, 2);
+
+        // test number of views equal to split policy
+        testGenerateSplits(10, 10, 1);
+
+        // test number of views equal to split policy
+        testGenerateSplits(8, 10, 1);
+
+        // test number of views is 0
+        testGenerateSplits(0, 10, 0);
+
+        // test split policy is 0
+        testGenerateSplits(8, 0, 1);
+    }
+
+    private void testGenerateSplits(int numberOfViews, int splitPolicy, int expectedResultSize) {
+        List<ViewInfoWritable> views = new ArrayList<>();
+        for (int i = 0; i < numberOfViews; i++) {
+            views.add(new ViewInfoTracker());
+        }
+        config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitPolicy));
+        List<InputSplit> result = defaultMultiViewSplitStrategy.generateSplits(views, config);
+        assertEquals(expectedResultSize, result.size());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
new file mode 100644
index 0000000..89a5c5a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.fail;
+import static org.apache.phoenix.mapreduce.util.
+        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE;
+import static org.apache.phoenix.mapreduce.util.
+        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ;
+import static org.apache.phoenix.mapreduce.util.
+        PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ;
+import static org.mockito.Mockito.when;
+
+public class PhoenixMultiViewInputFormatTest {
+
+    @Test
+    public void testDefaultConfig() throws Exception {
+        PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+        Configuration config = new Configuration();
+        config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+        JobContext mockContext = Mockito.mock(JobContext.class);
+        when(mockContext.getConfiguration()).thenReturn(config);
+
+        // default run should not raise error
+        multiViewInputFormat.getSplits(mockContext);
+    }
+
+
+    @Test
+    public void testCustomizedInputStrategyClassNotExists() {
+        PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+        Configuration config = new Configuration();
+        config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+        config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path");
+        JobContext mockContext = Mockito.mock(JobContext.class);
+        when(mockContext.getConfiguration()).thenReturn(config);
+
+        try {
+            multiViewInputFormat.getSplits(mockContext);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("ClassNotFoundException"));
+        }
+    }
+
+    @Test
+    public void testCustomizedInputSplitClassNotExists() {
+        PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat();
+
+        Configuration config = new Configuration();
+        config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10");
+        config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path");
+        JobContext mockContext = Mockito.mock(JobContext.class);
+        when(mockContext.getConfiguration()).thenReturn(config);
+
+        try {
+            multiViewInputFormat.getSplits(mockContext);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("ClassNotFoundException"));
+        }
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
new file mode 100644
index 0000000..0e439ee
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+public class PhoenixMultiViewReaderTest {
+
+    @Test
+    public void test() throws Exception {
+        String tenantId = "Tenant1";
+        String viewName = "viewName1";
+        long ttl = 1;
+        String indexTable = "indexTable";
+        String globalView = "globalView";
+
+        PhoenixMultiViewInputSplit mockInput = Mockito.mock(PhoenixMultiViewInputSplit.class);
+        TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class);
+        List<ViewInfoWritable> viewInfoTracker = new ArrayList<>();
+        viewInfoTracker.add(new ViewInfoTracker(
+                tenantId,
+                viewName,
+                ttl,
+                globalView,
+                false
+        ));
+
+        viewInfoTracker.add(new ViewInfoTracker(
+                tenantId,
+                viewName,
+                ttl,
+                indexTable,
+                true
+        ));
+        when(mockInput.getViewInfoTrackerList()).thenReturn(viewInfoTracker);
+        PhoenixMultiViewReader phoenixMultiViewReader = new PhoenixMultiViewReader();
+        phoenixMultiViewReader.initialize(mockInput, mockContext);
+
+        ViewInfoTracker viewInfoWritable;
+        assertTrue(phoenixMultiViewReader.nextKeyValue());
+        viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+        assertEquals(tenantId, viewInfoWritable.getTenantId());
+        assertEquals(viewName, viewInfoWritable.getViewName());
+        assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+        assertEquals(false, viewInfoWritable.isIndexRelation());
+
+        assertTrue(phoenixMultiViewReader.nextKeyValue());
+        viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+        assertEquals(tenantId, viewInfoWritable.getTenantId());
+        assertEquals(viewName, viewInfoWritable.getViewName());
+        assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+        assertEquals(true, viewInfoWritable.isIndexRelation());
+
+        assertFalse(phoenixMultiViewReader.nextKeyValue());
+        viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
+        assertNull(phoenixMultiViewReader.getCurrentValue());
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java
new file mode 100644
index 0000000..a453283
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.phoenix.query.BaseTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PhoenixTTLToolTest extends BaseTest {
+    String viewName = generateUniqueName();
+    String tenantId = generateUniqueName();
+
+    @Test
+    public void testParseInput() {
+        PhoenixTTLTool tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-a"});
+
+        assertEquals("NORMAL", tool.getJobPriority());
+        assertEquals(true, tool.isDeletingAllViews());
+        assertEquals(null, tool.getViewName());
+        assertEquals(null, tool.getTenantId());
+
+        tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-v", viewName, "-i",tenantId });
+        assertEquals("NORMAL", tool.getJobPriority());
+        assertEquals(false, tool.isDeletingAllViews());
+        assertEquals(viewName, tool.getViewName());
+        assertEquals(tenantId, tool.getTenantId());
+
+        tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-v", viewName, "-p", "0"});
+        assertEquals("VERY_HIGH", tool.getJobPriority());
+        assertEquals(false, tool.isDeletingAllViews());
+        assertEquals(viewName, tool.getViewName());
+        assertEquals(null, tool.getTenantId());
+
+        tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-v", viewName, "-p", "-1"});
+        assertEquals("NORMAL", tool.getJobPriority());
+        assertEquals(false, tool.isDeletingAllViews());
+        assertEquals(viewName, tool.getViewName());
+        assertEquals(null, tool.getTenantId());
+
+        tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-v", viewName, "-p", "DSAFDAS"});
+        assertEquals("NORMAL", tool.getJobPriority());
+        assertEquals(false, tool.isDeletingAllViews());
+        assertEquals(viewName, tool.getViewName());
+        assertEquals(null, tool.getTenantId());
+
+        tool = new PhoenixTTLTool();
+        tool.parseArgs(new String[] {"-i", tenantId});
+        assertEquals("NORMAL", tool.getJobPriority());
+        assertEquals(false, tool.isDeletingAllViews());
+        assertEquals(null, tool.getViewName());
+        assertEquals(tenantId, tool.getTenantId());
+    }
+
+    @Test (expected = IllegalStateException.class)
+    public void testNoInputParam() {
+        PhoenixTTLTool tool;
+        tool = new PhoenixTTLTool();
+        tool.parseOptions(new String[] {});
+    }
+}
\ No newline at end of file