You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/05/04 18:11:19 UTC

[GitHub] [phoenix] ChinmaySKulkarni commented on a change in pull request #762: PHOENIX-5592 MapReduce job to asynchronously delete rows where the VIEW_TTL has expired

ChinmaySKulkarni commented on a change in pull request #762:
URL: https://github.com/apache/phoenix/pull/762#discussion_r419609789



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;

Review comment:
       Just curious, any reason we choose 202?

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {

Review comment:
       No need to catch and rethrow

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;

Review comment:
       nit: I guess we want to use `org.junit.Assert.assertTrue`? 

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(

Review comment:
       Use try-with-resources for preparedStmt

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,

Review comment:
       Is `tableName` expected to be a globalView in this method? Otherwise, it looks like this method just creates a view and upserts data into it. Can you rename the method or variable name correspondingly?

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(

Review comment:
       Use try-with-resources with the preparedStmt

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();

Review comment:
       close inside finally or better yet use try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {

Review comment:
       Same here. Close underlying HConnection?

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {

Review comment:
       Shouldn't we close the HConnection as well? I don't think table closing will automatically close the underlying HBase connection.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();

Review comment:
       nit: Since `schema` and `baseTable` aren't really used again, can we just do `String fullTableName =  generateUniqueName() + "." + generateUniqueName()` ? Similar for other test cases.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);

Review comment:
       Create tenantConn inside try-with-resources
   
   

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " +
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) defaultViewDeletionTrackerClass.newInstance();
+        } catch (Exception e) {
+            LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker with error message");
+            LOGGER.error("stack trace" + e.getStackTrace().toString());
+            throw e;
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, ViewInfoTracker value, Context context) throws IOException  {
+        try {
+            final Configuration config = context.getConfiguration();
+
+            if (this.multiViewJobStatusTracker == null) {
+                initMultiViewJobStatusTracker(config);
+            }
+
+            LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+                    value.getViewName(), value.getTenantId(), value.getViewTtl()));
+
+            try (PhoenixConnection connection = (PhoenixConnection) ConnectionUtil.getInputConnection(config)) {
+                if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) {
+                    try (PhoenixConnection tenantConnection = (PhoenixConnection) PhoenixMultiInputUtil.
+                            buildTenantConnection(connection.getURL(), value.getTenantId())) {
+                        deletingExpiredRows(tenantConnection, value, config);
+                    }
+                } else {
+                    deletingExpiredRows(connection, value, config);
+                }
+            }
+        } catch (SQLException e) {
+            LOGGER.error("Mapper got an exception while deleting expired rows : " + e.getMessage() );
+            throw new IOException(e.getMessage(), e.getCause());
+        } catch (Exception e) {
+            LOGGER.error("Getting IOException while running View TTL Deletion Job mapper with error : "
+                    + e.getMessage());
+            throw new IOException(e.getMessage(), e.getCause());
+        }
+    }
+
+    private void deletingExpiredRows(PhoenixConnection connection, ViewInfoTracker view, Configuration config)
+            throws SQLException {
+
+        String deleteIfExpiredStatement = "DELETE FROM " + view.getViewName() +
+                " WHERE TO_NUMBER(NOW()) - TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) > " + view.getViewTtl();
+
+        try {
+            this.multiViewJobStatusTracker.updateJobStatus(view, 0,
+                    ViewInfoJobState.PREP.getValue(), config, 0);
+
+            long startTime = System.currentTimeMillis();
+            this.multiViewJobStatusTracker.updateJobStatus(view, 0,
+                    ViewInfoJobState.RUNNING.getValue(), config, 0 );
+            connection.setAutoCommit(true);
+            int numberOfDeletedRows = connection.createStatement().executeUpdate(deleteIfExpiredStatement);

Review comment:
       create statement inside try-with-resources. We have to very careful not to leak any connections/statements, etc. from our own framework. Can you please ensure that throughout? I may have missed some.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;

Review comment:
       Same for these variables and other such instances throughout other test cases

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+        boolean isQueryMore =
+                configuration.get(PhoenixConfigurationUtil.MAPREDUCE_VIEW_TTL_DELETE_JOB_ALL_VIEWS) != null;
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration, new Properties())){
+            TableName catalogOrChildTableName = ViewUtil.getSystemTableForChildLinks(0, configuration);
+            Table catalogOrChildTable = connection.getQueryServices()
+                    .getTable(SchemaUtil.getPhysicalName(catalogOrChildTableName.toBytes(),
+                            connection.getQueryServices().getProps())
+                            .getName());
+            do {
+                ResultSet viewRs = connection.createStatement().executeQuery(query);

Review comment:
       create stmt inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+            createMultiTenantTable(globalConn, fullTable11);
+            createMultiTenantTable(globalConn, fullTable12);
+
+            // NOT MULTI_TENANT TABLE
+            globalConn.createStatement().execute(String.format(

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+            createMultiTenantTable(globalConn, fullTable11);
+            createMultiTenantTable(globalConn, fullTable12);
+
+            // NOT MULTI_TENANT TABLE
+            globalConn.createStatement().execute(String.format(
+                    "CREATE TABLE %s (ID BIGINT PRIMARY KEY, NUM BIGINT) COLUMN_ENCODED_BYTES = 0", globalTable));
+            for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+                PreparedStatement stmt = globalConn.prepareStatement(
+                        "UPSERT INTO " + globalTable + " (ID, NUM) VALUES(?,?)");
+                stmt.setInt(1, i);
+                stmt.setInt(2, i);
+                stmt.execute();
+                stmt.close();
+            }
+            globalConn.commit();
+
+            globalConn.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s " +
+                    "WHERE NUM = 1", viewName1, globalTable));
+            globalConn.createStatement().execute(

Review comment:
       ditto throughout all places for auto-close stmt creation

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+            createMultiTenantTable(globalConn, fullTable11);
+            createMultiTenantTable(globalConn, fullTable12);
+
+            // NOT MULTI_TENANT TABLE
+            globalConn.createStatement().execute(String.format(
+                    "CREATE TABLE %s (ID BIGINT PRIMARY KEY, NUM BIGINT) COLUMN_ENCODED_BYTES = 0", globalTable));
+            for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+                PreparedStatement stmt = globalConn.prepareStatement(
+                        "UPSERT INTO " + globalTable + " (ID, NUM) VALUES(?,?)");
+                stmt.setInt(1, i);
+                stmt.setInt(2, i);
+                stmt.execute();
+                stmt.close();
+            }
+            globalConn.commit();
+
+            globalConn.createStatement().execute(String.format("CREATE VIEW %s AS SELECT * FROM %s " +

Review comment:
       Create statement inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);

Review comment:
       Create tenantConn inside try-with-resources

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLToolIT.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.phoenix.mapreduce.ViewTTLTool;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.junit.Test;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class ViewTTLToolIT extends ParallelStatsDisabledIT {
+
+    private final int ONE = 1;
+    private final int ZERO = 0;
+    private final int NUMBER_OF_UPSERT_ROWS = 202;
+    private final long VIEW_TTL_EXPIRE_IN_A_MILLISECOND = 1;
+    private final long VIEW_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
+
+    private void alterViewTtl(Connection conn, String viewName, long view_ttl_value)
+            throws SQLException {
+        conn.createStatement().execute(
+                String.format("ALTER VIEW %s SET VIEW_TTL= %d", viewName, view_ttl_value));
+    }
+
+    private void createViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                         long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) " +
+                "AS SELECT * FROM " + tableName + " VIEW_TTL=" + view_ttl_value;
+
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2) VALUES(?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void createViewOnGlobalViewAndUpsertData(Connection conn, String tableName, String viewName,
+                                                     long view_ttl_value) throws SQLException {
+        String ddl = "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + " VIEW_TTL= " + view_ttl_value;
+        conn.createStatement().execute(ddl);
+
+        for (int i = 0; i < NUMBER_OF_UPSERT_ROWS; i++) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + viewName + " (ID, PK1, PK2, NUM) VALUES(?,?,?,?)");
+            stmt.setString(1, generateUniqueName());
+            stmt.setInt(2, i);
+            stmt.setInt(3, i);
+            stmt.setInt(4, i);
+            stmt.execute();
+            stmt.close();
+        }
+        conn.commit();
+    }
+
+    private void verifyGlobalTableNumberOfRows(String tableName, int expectedRows) throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            assertEquals(expectedRows, getRowCount(table, new Scan()));
+        }
+    }
+
+    private void verifyIndexTableNumberOfRowsForATenant(String tableName, String regrex, int expectedRows)
+            throws Exception {
+        try (Table table  = HBaseFactoryProvider.getHConnectionFactory().createConnection(config).getTable(tableName)) {
+            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
+            Scan scan = new Scan();
+            scan.setFilter(filter);
+            assertEquals(expectedRows, getRowCount(table,scan));
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
+                                    Connection conn) throws Exception {
+        String query = "SELECT COUNT(*) FROM " + tableName;
+        if (tenantId != null) {
+            query = query + " WHERE TENANT_ID = '" + tenantId + "'";
+        }
+        try (Statement stm = conn.createStatement()) {
+
+            ResultSet rs = stm.executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals(expectedRows, rs.getInt(1));
+        }
+    }
+
+    private long getRowCount(Table table, Scan scan) throws Exception {
+        ResultScanner scanner = table.getScanner(scan);
+        int count = 0;
+        for (Result dummy : scanner) {
+            count++;
+        }
+        scanner.close();
+        return count;
+    }
+
+    private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
+        String ddl = "CREATE TABLE " + tableName +
+                " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
+                "PK PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
+
+        conn.createStatement().execute(ddl);
+    }
+
+    private void createGlobalViewWithPk(Connection conn, String tableName, String globalViewName) throws Exception {
+        String ddl = "CREATE VIEW " + globalViewName +
+                " (PK1 BIGINT NOT NULL, PK2 BIGINT NOT NULL CONSTRAINT PKVIEW PRIMARY KEY(PK1,PK2)) AS SELECT * FROM "
+                + tableName;
+        conn.createStatement().execute(ddl);
+    }
+
+    @Test
+    public void testViewOnMultiTenantTable() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS * 2, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            //running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            verifyNumberOfRows(fullTableName, null, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnMultiTenantTableCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTableName;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            createViewAndUpsertData(tenant1Connection, fullTableName, fullViewName1, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewAndUpsertData(tenant2Connection, fullTableName, fullViewName2, VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            tenant2Connection.createStatement().execute(
+                    "CREATE INDEX " + indexView + " ON " + fullViewName2 + "(NUM) INCLUDE (ID)");
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyGlobalTableNumberOfRows(indexTable, NUMBER_OF_UPSERT_ROWS);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg", "-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from multi-tenant table and index table for tenant2.
+            verifyGlobalTableNumberOfRows(indexTable, 0);
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testTenantViewOnGlobalViewWithPkCases() throws Exception {
+        String schema = generateUniqueName();
+        String baseTable = generateUniqueName();
+        String fullTableName = schema + "." + baseTable;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String globalViewName = generateUniqueName();
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String fullViewName1 = schema + "." + viewName1;
+        String fullViewName2 = schema + "." + viewName2;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
+
+            createMultiTenantTable(globalConn, fullTableName);
+            //create global views with PK
+            createGlobalViewWithPk(globalConn, fullTableName, globalViewName);
+
+            //create global views on top of global view
+            createViewOnGlobalViewAndUpsertData(tenant1Connection, globalViewName, fullViewName1,
+                    VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+            createViewOnGlobalViewAndUpsertData(tenant2Connection, globalViewName, fullViewName2,
+                    VIEW_TTL_EXPIRE_IN_A_DAY);
+
+            // before running MR deleting job, all rows should be present in multi tenant table.
+            verifyNumberOfRows(fullTableName, tenant1, NUMBER_OF_UPSERT_ROWS, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // running MR job to delete expired rows.
+            ViewTTLTool viewTtlTool = new ViewTTLTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            viewTtlTool.setConf(conf);
+            int status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // first run should delete expired rows for tenant1 but not tenant2
+            verifyNumberOfRows(fullTableName, tenant1, 0, globalConn);
+            verifyNumberOfRows(fullTableName, tenant2, NUMBER_OF_UPSERT_ROWS, globalConn);
+
+            // alter the view ttl and all rows should expired immediately.
+            alterViewTtl(tenant2Connection, fullViewName2, VIEW_TTL_EXPIRE_IN_A_MILLISECOND);
+
+            status = viewTtlTool.run(new String[]{"-runfg","-a"});
+            assertEquals(0, status);
+
+            // MR job should delete rows from the multi-tenant table
+            verifyNumberOfRows(fullTableName, tenant2, 0, globalConn);
+        }
+    }
+
+    @Test
+    public void testAllViewCases() throws Exception {
+        String schema1 = generateUniqueName();
+        String baseTable1 = generateUniqueName();
+        String baseTable2 = generateUniqueName();
+        String globalTable = generateUniqueName();
+
+        String fullTable11 = schema1 + "." + baseTable1;
+        String fullTable12 = schema1 + "." + baseTable2;
+
+        String tenant1 = generateUniqueName();
+        String tenant2 = generateUniqueName();
+
+        String viewName1 = generateUniqueName();
+        String viewName2 = generateUniqueName();
+
+        String indexView = viewName2 + "_INDEX";
+        String indexTable = "_IDX_" + fullTable12;
+
+        try (Connection globalConn = DriverManager.getConnection(getUrl());
+             Connection tenant1Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
+             Connection tenant2Connection = PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {

Review comment:
       Same throughout other test cases, don't want to spam with the same comment..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org