You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/11/20 16:54:58 UTC

[GitHub] [phoenix] jpisaac commented on a change in pull request #975: PHOENIX-5592 MapReduce job to asynchronously delete rows where the VI…

jpisaac commented on a change in pull request #975:
URL: https://github.com/apache/phoenix/pull/975#discussion_r527746539



##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PhoenixMultiViewInputFormat<T extends Writable> extends InputFormat<NullWritable,T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewInputFormat.class);
+
+    public PhoenixMultiViewInputFormat() {
+    }
+

Review comment:
       Nit : remove empty constructor

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+

Review comment:
       Nit: Class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " +
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) defaultViewDeletionTrackerClass.newInstance();
+        } catch (Exception e) {
+            LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker with error message");
+            LOGGER.error("stack trace" + e.getStackTrace().toString());
+            throw e;
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, ViewInfoTracker value, Context context) throws IOException  {
+        try {
+            final Configuration config = context.getConfiguration();
+
+            if (this.multiViewJobStatusTracker == null) {
+                initMultiViewJobStatusTracker(config);
+            }
+
+            LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+                    value.getViewName(), value.getTenantId(), value.getPhoenixTtl()));
+
+            deletingExpiredRows(value, config, context);
+
+        } catch (SQLException e) {
+            LOGGER.error("Mapper got an exception while deleting expired rows : " + e.getMessage() );
+            throw new IOException(e.getMessage(), e.getCause());
+        } catch (Exception e) {
+            LOGGER.error("Getting IOException while running View TTL Deletion Job mapper with error : "
+                    + e.getMessage());
+            throw new IOException(e.getMessage(), e.getCause());
+        }
+    }
+
+    private void deletingExpiredRows(ViewInfoTracker value, Configuration config, Context context) throws Exception {
+        try (PhoenixConnection connection = (PhoenixConnection) ConnectionUtil.getInputConnection(config)) {
+            if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) {
+                Properties props = new Properties();
+                props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, value.getTenantId());
+
+                try (PhoenixConnection tenantConnection = (PhoenixConnection)
+                        DriverManager.getConnection(connection.getURL(), props)) {
+                    deletingExpiredRows(tenantConnection, value, config, context);
+                }
+            } else {
+                deletingExpiredRows(connection, value, config, context);
+            }
+        }
+    }
+
+    private void deletingExpiredRows(PhoenixConnection connection, ViewInfoTracker viewInfoTracker,
+                                     Configuration config, Context context) throws Exception {
+        try {
+            PTable ptable = PhoenixRuntime.getTable(connection, viewInfoTracker.getViewName());
+            String deleteIfExpiredStatement = "SELECT /*+ NO_INDEX */ count(*) FROM " + viewInfoTracker.getViewName();
+
+            if (viewInfoTracker.isIndexRelation()) {
+                ptable = PhoenixRuntime.getTable(connection, viewInfoTracker.getRelationName());
+                deleteIfExpiredStatement = "SELECT count(*) FROM " + viewInfoTracker.getRelationName();
+            }
+
+            deletingExpiredRows(connection, ptable, deleteIfExpiredStatement, config, context, viewInfoTracker);
+
+        } catch (Exception e) {
+            LOGGER.error(String.format("Had an issue to process the view: %s, " +
+                    "see error %s ", viewInfoTracker.toString(),e.getMessage()));
+        }
+    }
+
+    /*
+     * Each Mapper that receives a MultiPhoenixViewInputSplit will execute a DeleteMutation/Scan
+     *  (With DELETE_TTL_EXPIRED attribute) per view for all the views and view indexes in the split.
+     * For each DeleteMutation, it bounded by the view start and stop keys for the region and
+     *  TTL attributes and Delete Hint.
+     */
+    private boolean deletingExpiredRows(PhoenixConnection connection, PTable pTable,
+                                        String deleteIfExpiredStatement, Configuration config,
+                                        Context context, ViewInfoTracker viewInfoTracker) throws Exception {
+
+        try (PhoenixStatement pstmt = new PhoenixStatement(connection).unwrap(PhoenixStatement.class)) {
+            String sourceTableName = pTable.getTableName().getString();
+            this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                    ViewInfoJobState.PREP.getValue(), config, 0, context.getJobName(), sourceTableName);
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(pTable);
+            byte[] emptyColumnName =
+                    pTable.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            pTable.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.FALSE_BYTES);

Review comment:
       Will need to remove this attribute, as there are additional checks => both attributes cannot be set.

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " +
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) defaultViewDeletionTrackerClass.newInstance();

Review comment:
       Is it necessary to cast it to the interface?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {

Review comment:
       nit : Class comments, Also should the naming be PhoenixTTL?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+        boolean isQueryMore = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)){
+            try (Statement stmt = connection.createStatement()) {
+                do {
+                    ResultSet viewRs = stmt.executeQuery(query);
+                    String schema = null;
+                    String tableName = null;
+                    String tenantId = null;
+                    String fullTableName = null;
+
+                    while (viewRs.next()) {
+                        schema = viewRs.getString(2);
+                        tableName = viewRs.getString(3);
+                        tenantId = viewRs.getString(1);
+                        fullTableName = tableName;
+                        Long viewTtlValue = viewRs.getLong(4);

Review comment:
       nit: keep the order of the getXXXX index?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PhoenixMultiViewInputSplit extends InputSplit implements Writable {

Review comment:
       nit: Class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {

Review comment:
       nit: class comments and do we want to change it PhoenixTTL instead of ViewTTL?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PhoenixMultiViewInputFormat<T extends Writable> extends InputFormat<NullWritable,T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewInputFormat.class);
+
+    public PhoenixMultiViewInputFormat() {
+    }
+
+    @Override public List<InputSplit> getSplits(JobContext context) throws IOException {
+        List<InputSplit> listOfInputSplit = new ArrayList<>();
+        try {
+            final Configuration configuration = context.getConfiguration();
+            Class<?> defaultDeletionMultiInputStrategyClazz = DefaultPhoenixMultiViewListProvider.class;

Review comment:
       nit: remove "deletion" from variable naming, since it is not specific to phoenix-ttl

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class PhoenixMultiViewReader<T extends Writable> extends RecordReader<NullWritable,T> {

Review comment:
       nit: class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PhoenixMultiViewInputSplit extends InputSplit implements Writable {
+
+    List<ViewInfoWritable> viewInfoTrackerList;
+
+    public PhoenixMultiViewInputSplit() {
+        this.viewInfoTrackerList = new ArrayList<>();
+    }
+
+    public PhoenixMultiViewInputSplit(List<ViewInfoWritable> viewInfoTracker) {
+        this.viewInfoTrackerList = viewInfoTracker;
+    }
+
+    @Override public void write(DataOutput output) throws IOException {
+        WritableUtils.writeVInt(output, this.viewInfoTrackerList.size());
+        for (ViewInfoWritable viewInfoWritable : this.viewInfoTrackerList) {
+            ViewInfoTracker viewInfoTracker = (ViewInfoTracker)viewInfoWritable;

Review comment:
       Can u add a check for instance type, that way it is future proof

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " +
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) defaultViewDeletionTrackerClass.newInstance();
+        } catch (Exception e) {
+            LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker with error message");
+            LOGGER.error("stack trace" + e.getStackTrace().toString());
+            throw e;
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, ViewInfoTracker value, Context context) throws IOException  {
+        try {
+            final Configuration config = context.getConfiguration();
+
+            if (this.multiViewJobStatusTracker == null) {
+                initMultiViewJobStatusTracker(config);
+            }
+
+            LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+                    value.getViewName(), value.getTenantId(), value.getPhoenixTtl()));
+
+            deletingExpiredRows(value, config, context);
+
+        } catch (SQLException e) {
+            LOGGER.error("Mapper got an exception while deleting expired rows : " + e.getMessage() );
+            throw new IOException(e.getMessage(), e.getCause());
+        } catch (Exception e) {
+            LOGGER.error("Getting IOException while running View TTL Deletion Job mapper with error : "
+                    + e.getMessage());
+            throw new IOException(e.getMessage(), e.getCause());
+        }
+    }
+
+    private void deletingExpiredRows(ViewInfoTracker value, Configuration config, Context context) throws Exception {

Review comment:
       nit: naming - "deleteExpiredRows"

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";
+

Review comment:
       nit: remove empyLines?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLDeleteJobMapper.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.ViewInfoTracker;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker;
+import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class ViewTTLDeleteJobMapper extends Mapper<NullWritable, ViewInfoTracker, NullWritable, NullWritable> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLDeleteJobMapper.class);
+    private MultiViewJobStatusTracker multiViewJobStatusTracker;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000;
+
+    private void initMultiViewJobStatusTracker(Configuration config) throws Exception {
+        try {
+            Class<?> defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class;
+            if (config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) {
+                LOGGER.info("Using customized tracker class : " +
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+                defaultViewDeletionTrackerClass = Class.forName(
+                        config.get(PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ));
+            } else {
+                LOGGER.info("Using default tracker class ");
+            }
+            this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) defaultViewDeletionTrackerClass.newInstance();
+        } catch (Exception e) {
+            LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker with error message");
+            LOGGER.error("stack trace" + e.getStackTrace().toString());
+            throw e;
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, ViewInfoTracker value, Context context) throws IOException  {
+        try {
+            final Configuration config = context.getConfiguration();
+
+            if (this.multiViewJobStatusTracker == null) {
+                initMultiViewJobStatusTracker(config);
+            }
+
+            LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d",
+                    value.getViewName(), value.getTenantId(), value.getPhoenixTtl()));
+
+            deletingExpiredRows(value, config, context);
+
+        } catch (SQLException e) {
+            LOGGER.error("Mapper got an exception while deleting expired rows : " + e.getMessage() );
+            throw new IOException(e.getMessage(), e.getCause());
+        } catch (Exception e) {
+            LOGGER.error("Getting IOException while running View TTL Deletion Job mapper with error : "
+                    + e.getMessage());
+            throw new IOException(e.getMessage(), e.getCause());
+        }
+    }
+
+    private void deletingExpiredRows(ViewInfoTracker value, Configuration config, Context context) throws Exception {
+        try (PhoenixConnection connection = (PhoenixConnection) ConnectionUtil.getInputConnection(config)) {
+            if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) {
+                Properties props = new Properties();
+                props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, value.getTenantId());
+
+                try (PhoenixConnection tenantConnection = (PhoenixConnection)
+                        DriverManager.getConnection(connection.getURL(), props)) {
+                    deletingExpiredRows(tenantConnection, value, config, context);
+                }
+            } else {
+                deletingExpiredRows(connection, value, config, context);
+            }
+        }
+    }
+
+    private void deletingExpiredRows(PhoenixConnection connection, ViewInfoTracker viewInfoTracker,
+                                     Configuration config, Context context) throws Exception {
+        try {
+            PTable ptable = PhoenixRuntime.getTable(connection, viewInfoTracker.getViewName());
+            String deleteIfExpiredStatement = "SELECT /*+ NO_INDEX */ count(*) FROM " + viewInfoTracker.getViewName();
+
+            if (viewInfoTracker.isIndexRelation()) {
+                ptable = PhoenixRuntime.getTable(connection, viewInfoTracker.getRelationName());
+                deleteIfExpiredStatement = "SELECT count(*) FROM " + viewInfoTracker.getRelationName();
+            }
+
+            deletingExpiredRows(connection, ptable, deleteIfExpiredStatement, config, context, viewInfoTracker);
+
+        } catch (Exception e) {
+            LOGGER.error(String.format("Had an issue to process the view: %s, " +
+                    "see error %s ", viewInfoTracker.toString(),e.getMessage()));
+        }
+    }
+
+    /*
+     * Each Mapper that receives a MultiPhoenixViewInputSplit will execute a DeleteMutation/Scan
+     *  (With DELETE_TTL_EXPIRED attribute) per view for all the views and view indexes in the split.
+     * For each DeleteMutation, it bounded by the view start and stop keys for the region and
+     *  TTL attributes and Delete Hint.
+     */
+    private boolean deletingExpiredRows(PhoenixConnection connection, PTable pTable,
+                                        String deleteIfExpiredStatement, Configuration config,
+                                        Context context, ViewInfoTracker viewInfoTracker) throws Exception {
+
+        try (PhoenixStatement pstmt = new PhoenixStatement(connection).unwrap(PhoenixStatement.class)) {
+            String sourceTableName = pTable.getTableName().getString();
+            this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0,
+                    ViewInfoJobState.PREP.getValue(), config, 0, context.getJobName(), sourceTableName);
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(pTable);
+            byte[] emptyColumnName =
+                    pTable.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            pTable.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.FALSE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(viewInfoTracker.getPhoenixTtl())));

Review comment:
       Can u also set the attribute BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";
+
+    public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+
+    public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+    private static final Option DELETE_ALL_VIEW_OPTION = new Option("a", "all", false,

Review comment:
       nit: use plural => DELETE_ALL_VIEWS_OPTION

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.ViewTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+    public List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration) {
+        int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+
+        if (numViewsInSplit < 1) {

Review comment:
       numViewSplit <= 0, else can run into possible divide by zero later

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";
+
+    public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+
+    public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+    private static final Option DELETE_ALL_VIEW_OPTION = new Option("a", "all", false,
+            "Delete all views from all tables.");
+    private static final Option VIEW_NAME_OPTION = new Option("v", "view", true,
+            "Delete Phoenix View Name");
+    private static final Option TENANT_ID_OPTION = new Option("i", "id", true,
+            "Delete an view based on the tenant id.");
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4");
+    private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true,
+            "Define split size for each mapper.");
+    private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true,
+            "Define batch size for fetching views metadata from syscat.");
+    private static final Option RUN_FOREGROUND_OPTION = new Option("runfg",
+            "run-foreground", false, "If specified, runs ViewTTLTool " +
+            "in Foreground. Default - Runs the build in background");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+    Configuration configuration;
+    Connection connection;
+
+    private String viewName;
+    private String tenantId;
+    private String jobName;
+    private boolean isDeletingAllViews;
+    private JobPriority jobPriority;
+    private boolean isForeground;
+    private int splitSize;
+    private int batchSize;
+    private Job job;
+
+    public void parseArgs(String[] args) {
+        CommandLine cmdLine;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            throw e;
+        }
+
+        if (getConf() == null) {
+            setConf(HBaseConfiguration.create());
+        }
+
+        if (cmdLine.hasOption(DELETE_ALL_VIEW_OPTION.getOpt())) {
+            this.isDeletingAllViews = true;
+        } else if (cmdLine.hasOption(VIEW_NAME_OPTION.getOpt())) {
+            viewName = cmdLine.getOptionValue(VIEW_NAME_OPTION.getOpt());
+            this.isDeletingAllViews = false;
+        }
+
+        if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            tenantId = cmdLine.getOptionValue((TENANT_ID_OPTION.getOpt()));
+        }
+
+        jobPriority = getJobPriority(cmdLine);
+        if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
+            splitSize = Integer.valueOf(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            splitSize = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        if (cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())) {
+            batchSize = Integer.valueOf(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            batchSize = DEFAULT_QUERY_BATCH_SIZE;
+        }
+
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    }
+
+    public String getJobPriority() {
+        return this.jobPriority.toString();
+    }
+
+    private JobPriority getJobPriority(CommandLine cmdLine) {
+        String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+        if (jobPriorityOption == null) {
+            return JobPriority.NORMAL;
+        }
+
+        switch (jobPriorityOption) {
+            case "0" : return JobPriority.VERY_HIGH;
+            case "1" : return JobPriority.HIGH;
+            case "2" : return JobPriority.NORMAL;
+            case "3" : return JobPriority.LOW;
+            case "4" : return JobPriority.VERY_LOW;
+            default:
+                return JobPriority.NORMAL;
+        }
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public boolean isDeletingAllViews() {
+        return this.isDeletingAllViews;
+    }
+
+    public String getTenantId() {
+        return this.tenantId;
+    }
+
+    public String getViewName() {
+        return this.viewName;
+    }
+
+    public int getSplitSize() {
+        return this.splitSize;
+    }
+
+    public int getBatchSize() {
+        return this.batchSize;
+    }
+
+    public CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+        }
+
+        if (!cmdLine.hasOption(DELETE_ALL_VIEW_OPTION.getOpt()) &&
+                !cmdLine.hasOption(VIEW_NAME_OPTION.getOpt()) &&
+                !cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            throw new IllegalStateException("No deletion job is specified, " +
+                    "please indicate deletion job for ALL/TABLE/VIEW/TENANT level");
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        this.jobPriority = getJobPriority(cmdLine);
+
+        return cmdLine;
+    }
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(DELETE_ALL_VIEW_OPTION);
+        options.addOption(VIEW_NAME_OPTION);
+        options.addOption(TENANT_ID_OPTION);
+        options.addOption(HELP_OPTION);
+        options.addOption(JOB_PRIORITY_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
+        options.addOption(SPLIT_SIZE_OPTION);
+        options.addOption(BATCH_SIZE_OPTION);
+
+        return options;
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);

Review comment:
       Do we want it to log it too?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";
+
+    public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+
+    public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+    private static final Option DELETE_ALL_VIEW_OPTION = new Option("a", "all", false,
+            "Delete all views from all tables.");
+    private static final Option VIEW_NAME_OPTION = new Option("v", "view", true,
+            "Delete Phoenix View Name");
+    private static final Option TENANT_ID_OPTION = new Option("i", "id", true,
+            "Delete an view based on the tenant id.");
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4");
+    private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true,
+            "Define split size for each mapper.");
+    private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true,
+            "Define batch size for fetching views metadata from syscat.");
+    private static final Option RUN_FOREGROUND_OPTION = new Option("runfg",
+            "run-foreground", false, "If specified, runs ViewTTLTool " +
+            "in Foreground. Default - Runs the build in background");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+    Configuration configuration;
+    Connection connection;

Review comment:
       Any particular reason these are with default visibility and not private?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.ViewTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+    public List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration) {
+        int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+
+        if (numViewsInSplit < 1) {
+            numViewsInSplit = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        int numberOfMappers = views.size() / numViewsInSplit;
+        if (Math.ceil(views.size() % numViewsInSplit) > 0) {
+            numberOfMappers++;
+        }
+
+        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(numberOfMappers);

Review comment:
       nit: camelCase variable naming

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";

Review comment:
       Do you want rename this variable and value to be DELETE_ALL_VIEWS?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {

Review comment:
       nit: class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultMultiViewJobStatusTracker implements MultiViewJobStatusTracker {

Review comment:
       nit: class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+        boolean isQueryMore = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)){
+            try (Statement stmt = connection.createStatement()) {
+                do {
+                    ResultSet viewRs = stmt.executeQuery(query);
+                    String schema = null;
+                    String tableName = null;
+                    String tenantId = null;
+                    String fullTableName = null;
+
+                    while (viewRs.next()) {
+                        schema = viewRs.getString(2);
+                        tableName = viewRs.getString(3);
+                        tenantId = viewRs.getString(1);
+                        fullTableName = tableName;
+                        Long viewTtlValue = viewRs.getLong(4);
+
+                        if (schema != null && schema.length() > 0) {
+                            fullTableName = SchemaUtil.getTableName(schema, tableName);
+                        }
+
+                        boolean skip = false;
+                        PTable pTable = null;
+                        try {
+                            pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+                            // we currently only support up to three levels
+                            // CASE 1 : BASE_TABLE -> GLOBAL_VIEW -> TENANT_VIEW
+                            // CASE 2 : BASE_TABLE -> TENANT_VIEW
+                            // CASE 2 : BASE_TABLE -> VIEW
+                            PTable parentTable = PhoenixRuntime.getTable(connection, null,
+                                    pTable.getParentName().toString());
+                            if (parentTable.getType() == PTableType.VIEW &&

Review comment:
       Can u add comments here please?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.ViewTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {

Review comment:
       nit: class comments

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ViewTTLTool.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.Properties;
+
+public class ViewTTLTool extends Configured implements Tool {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLTool.class);
+
+    public static enum MR_COUNTER_METRICS {
+        FAILED,
+        SUCCEED
+    }
+
+    public static final String ADDING_DELETION_MARKS_FOR_ALL_VIEWS = "ADDING_DELETION_MARKS_FOR_ALL_VIEWS";
+
+    public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10;
+
+    public static final int DEFAULT_QUERY_BATCH_SIZE = 100;
+
+    private static final Option DELETE_ALL_VIEW_OPTION = new Option("a", "all", false,
+            "Delete all views from all tables.");
+    private static final Option VIEW_NAME_OPTION = new Option("v", "view", true,
+            "Delete Phoenix View Name");
+    private static final Option TENANT_ID_OPTION = new Option("i", "id", true,
+            "Delete an view based on the tenant id.");
+    private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true,
+            "Define job priority from 0(highest) to 4");
+    private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true,
+            "Define split size for each mapper.");
+    private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true,
+            "Define batch size for fetching views metadata from syscat.");
+    private static final Option RUN_FOREGROUND_OPTION = new Option("runfg",
+            "run-foreground", false, "If specified, runs ViewTTLTool " +
+            "in Foreground. Default - Runs the build in background");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+    Configuration configuration;
+    Connection connection;
+
+    private String viewName;
+    private String tenantId;
+    private String jobName;
+    private boolean isDeletingAllViews;
+    private JobPriority jobPriority;
+    private boolean isForeground;
+    private int splitSize;
+    private int batchSize;
+    private Job job;
+
+    public void parseArgs(String[] args) {
+        CommandLine cmdLine;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+            throw e;
+        }
+
+        if (getConf() == null) {
+            setConf(HBaseConfiguration.create());
+        }
+
+        if (cmdLine.hasOption(DELETE_ALL_VIEW_OPTION.getOpt())) {
+            this.isDeletingAllViews = true;
+        } else if (cmdLine.hasOption(VIEW_NAME_OPTION.getOpt())) {
+            viewName = cmdLine.getOptionValue(VIEW_NAME_OPTION.getOpt());
+            this.isDeletingAllViews = false;
+        }
+
+        if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            tenantId = cmdLine.getOptionValue((TENANT_ID_OPTION.getOpt()));
+        }
+
+        jobPriority = getJobPriority(cmdLine);
+        if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
+            splitSize = Integer.valueOf(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            splitSize = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        if (cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())) {
+            batchSize = Integer.valueOf(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+        } else {
+            batchSize = DEFAULT_QUERY_BATCH_SIZE;
+        }
+
+        isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    }
+
+    public String getJobPriority() {
+        return this.jobPriority.toString();
+    }
+
+    private JobPriority getJobPriority(CommandLine cmdLine) {
+        String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt());
+        if (jobPriorityOption == null) {
+            return JobPriority.NORMAL;
+        }
+
+        switch (jobPriorityOption) {
+            case "0" : return JobPriority.VERY_HIGH;
+            case "1" : return JobPriority.HIGH;
+            case "2" : return JobPriority.NORMAL;
+            case "3" : return JobPriority.LOW;
+            case "4" : return JobPriority.VERY_LOW;
+            default:
+                return JobPriority.NORMAL;
+        }
+    }
+
+    public Job getJob() {
+        return this.job;
+    }
+
+    public boolean isDeletingAllViews() {
+        return this.isDeletingAllViews;
+    }
+
+    public String getTenantId() {
+        return this.tenantId;
+    }
+
+    public String getViewName() {
+        return this.viewName;
+    }
+
+    public int getSplitSize() {
+        return this.splitSize;
+    }
+
+    public int getBatchSize() {
+        return this.batchSize;
+    }
+
+    public CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+        }
+
+        if (!cmdLine.hasOption(DELETE_ALL_VIEW_OPTION.getOpt()) &&
+                !cmdLine.hasOption(VIEW_NAME_OPTION.getOpt()) &&
+                !cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+            throw new IllegalStateException("No deletion job is specified, " +
+                    "please indicate deletion job for ALL/TABLE/VIEW/TENANT level");
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        this.jobPriority = getJobPriority(cmdLine);

Review comment:
       This is getting called in multiple places, can we consolidate?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
##########
@@ -166,6 +166,27 @@
 
     public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
 
+    // group number of views per mapper to run the deletion job
+    public static final String MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE = "phoenix.mapreduce.multi.input.split.size";
+
+    public static final String MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE = "phoenix.mapreduce.multi.input.batch.size";
+
+    // phoenix ttl data deletion job for a specific view
+    public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW = "phoenix.mapreduce.view_ttl.view";

Review comment:
       nit: variable name and value do not match

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.ViewTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+    public List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration) {
+        int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+
+        if (numViewsInSplit < 1) {
+            numViewsInSplit = DEFAULT_MAPPER_SPLIT_SIZE;
+        }
+
+        int numberOfMappers = views.size() / numViewsInSplit;
+        if (Math.ceil(views.size() % numViewsInSplit) > 0) {
+            numberOfMappers++;
+        }
+
+        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(numberOfMappers);
+        // Split the views into splits
+
+        for (int i = 0; i < numberOfMappers; i++) {
+            psplits.add(new PhoenixMultiViewInputSplit(views.subList(
+                    i * numViewsInSplit, getUpperBound(numViewsInSplit, i, views.size()))));
+        }
+
+        return psplits;
+    }
+
+    public int getUpperBound(int numViewsInSplit, int i, int viewSize) {

Review comment:
       Does it need to be public?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit;
+
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.ViewTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+
+public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy {
+
+    public List<InputSplit> generateSplits(List<ViewInfoWritable> views, Configuration configuration) {
+        int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration);
+

Review comment:
       Check for empty views?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+        boolean isQueryMore = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)){
+            try (Statement stmt = connection.createStatement()) {
+                do {
+                    ResultSet viewRs = stmt.executeQuery(query);
+                    String schema = null;
+                    String tableName = null;
+                    String tenantId = null;
+                    String fullTableName = null;
+
+                    while (viewRs.next()) {
+                        schema = viewRs.getString(2);
+                        tableName = viewRs.getString(3);
+                        tenantId = viewRs.getString(1);
+                        fullTableName = tableName;
+                        Long viewTtlValue = viewRs.getLong(4);
+
+                        if (schema != null && schema.length() > 0) {
+                            fullTableName = SchemaUtil.getTableName(schema, tableName);
+                        }
+
+                        boolean skip = false;
+                        PTable pTable = null;
+                        try {
+                            pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+                            // we currently only support up to three levels

Review comment:
       Why only upto 3 levels?

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider {
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class);
+
+    public List<ViewInfoWritable> getPhoenixMultiViewList(Configuration configuration) {
+        List<ViewInfoWritable> viewInfoWritables = new ArrayList<>();
+
+        String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration);
+        boolean isQueryMore = configuration.get(
+                PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null;
+        int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration);
+        try (PhoenixConnection connection = (PhoenixConnection)
+                ConnectionUtil.getInputConnection(configuration)){
+            try (Statement stmt = connection.createStatement()) {
+                do {
+                    ResultSet viewRs = stmt.executeQuery(query);
+                    String schema = null;
+                    String tableName = null;
+                    String tenantId = null;
+                    String fullTableName = null;
+
+                    while (viewRs.next()) {
+                        schema = viewRs.getString(2);
+                        tableName = viewRs.getString(3);
+                        tenantId = viewRs.getString(1);
+                        fullTableName = tableName;
+                        Long viewTtlValue = viewRs.getLong(4);
+
+                        if (schema != null && schema.length() > 0) {
+                            fullTableName = SchemaUtil.getTableName(schema, tableName);
+                        }
+
+                        boolean skip = false;
+                        PTable pTable = null;
+                        try {
+                            pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName);
+                            // we currently only support up to three levels
+                            // CASE 1 : BASE_TABLE -> GLOBAL_VIEW -> TENANT_VIEW
+                            // CASE 2 : BASE_TABLE -> TENANT_VIEW
+                            // CASE 2 : BASE_TABLE -> VIEW
+                            PTable parentTable = PhoenixRuntime.getTable(connection, null,
+                                    pTable.getParentName().toString());
+                            if (parentTable.getType() == PTableType.VIEW &&
+                                    parentTable.getPhoenixTTL() > 0) {
+                                skip = true;
+                            }
+                        } catch (Exception e) {
+                            skip = true;
+                            LOGGER.error(String.format("Had an issue to process the view: %s, tenantId:" +
+                                    "see error %s ", fullTableName, tenantId, e.getMessage()));
+                        }
+
+                        if (!skip) {
+                            ViewInfoWritable viewInfoTracker = new ViewInfoTracker(
+                                    tenantId,
+                                    fullTableName,
+                                    viewTtlValue,
+                                    pTable.getPhysicalName().getString(),
+                                    false
+
+                            );
+                            viewInfoWritables.add(viewInfoTracker);
+
+                            List<PTable> allIndexesOnView = pTable.getIndexes();
+                            for (PTable viewIndexTable : allIndexesOnView) {
+                                String indexName = viewIndexTable.getTableName().getString();
+                                String indexSchema = viewIndexTable.getSchemaName().getString();
+                                if (indexName.contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+                                    indexName = SchemaUtil.getTableNameFromFullName(indexName,
+                                            QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+                                }
+                                indexName = SchemaUtil.getTableNameFromFullName(indexName);
+                                indexName = SchemaUtil.getTableName(indexSchema, indexName);
+                                ViewInfoWritable viewInfoTrackerForIndexEntry = new ViewInfoTracker(
+                                        tenantId,
+                                        fullTableName,
+                                        viewTtlValue,
+                                        indexName,
+                                        true
+
+                                );
+                                viewInfoWritables.add(viewInfoTrackerForIndexEntry);
+                            }
+                        }
+                    }
+                    if (isQueryMore) {
+                        if (fullTableName == null) {

Review comment:
       fullTableName is not being used anywhere after this, so why the check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org