You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ya...@apache.org on 2020/11/18 05:28:44 UTC

[phoenix] branch 4.x updated: PHOENIX-5601 Add a new coprocessor for PHOENIX_TTL - PhoenixTTLRegionObserver

This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 510ca96  PHOENIX-5601 Add a new coprocessor for PHOENIX_TTL - PhoenixTTLRegionObserver
510ca96 is described below

commit 510ca96dbcd713db73402e2e3c25a5cf0e3b1702
Author: Jacob Isaac <ji...@salesforce.com>
AuthorDate: Mon Nov 16 13:22:19 2020 -0800

    PHOENIX-5601 Add a new coprocessor for PHOENIX_TTL - PhoenixTTLRegionObserver
    
    Signed-off-by: Xinyi Yan <ya...@apache.org>
---
 .../org/apache/phoenix/end2end/EmptyColumnIT.java  |    6 +-
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 2017 +++++++++++++++++++-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |    3 +-
 .../coprocessor/BaseScannerRegionObserver.java     |    5 +-
 .../coprocessor/PhoenixTTLRegionObserver.java      |  307 +++
 .../MetricsPhoenixCoprocessorSourceFactory.java    |   45 +
 .../metrics/MetricsPhoenixTTLSource.java           |   61 +
 .../metrics/MetricsPhoenixTTLSourceImpl.java       |   58 +
 .../phoenix/iterate/TableResultIterator.java       |    4 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   11 +
 .../org/apache/phoenix/query/QueryServices.java    |    2 +
 .../apache/phoenix/query/QueryServicesOptions.java |    5 +-
 .../java/org/apache/phoenix/util/IndexUtil.java    |  133 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  288 ++-
 .../apache/phoenix/query/PhoenixTestBuilder.java   |  552 +++++-
 .../java/org/apache/phoenix/util/ScanUtilTest.java |  106 +-
 16 files changed, 3318 insertions(+), 285 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EmptyColumnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EmptyColumnIT.java
index 184a053..72413b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EmptyColumnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EmptyColumnIT.java
@@ -800,7 +800,7 @@ public class EmptyColumnIT extends ParallelStatsDisabledIT {
     private void upsertDataAndRunValidations(int numRowsToUpsert,
             ExpectedTestResults expectedTestResults, DataWriter dataWriter,
             SchemaBuilder schemaBuilder, List<Integer> overriddenColumnsPositions)
-            throws IOException, SQLException {
+            throws Exception {
 
         //Insert for the first time and validate them.
         validateEmptyColumnsAreUpdated(upsertData(dataWriter, numRowsToUpsert), expectedTestResults,
@@ -820,7 +820,7 @@ public class EmptyColumnIT extends ParallelStatsDisabledIT {
     private void upsertDataAndRunValidations(int numRowsToUpsert,
             ExpectedTestResults expectedTestResults, DataWriter dataWriter,
             SchemaBuilder schemaBuilder, List<Integer> overriddenColumnsPositions,
-            List<Integer> indexedCFPositions) throws IOException, SQLException {
+            List<Integer> indexedCFPositions) throws Exception {
 
         //Insert for the first time and validate them.
         validateEmptyColumnsAreUpdated(upsertData(dataWriter, numRowsToUpsert), expectedTestResults,
@@ -837,7 +837,7 @@ public class EmptyColumnIT extends ParallelStatsDisabledIT {
                 schemaBuilder, indexedCFPositions);
     }
 
-    private long upsertData(DataWriter dataWriter, int numRowsToUpsert) throws SQLException {
+    private long upsertData(DataWriter dataWriter, int numRowsToUpsert) throws Exception {
         // Upsert rows
         long earliestTimestamp = System.currentTimeMillis();
         for (int i = 0; i < numRowsToUpsert; i++) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index 09fb765..c9fb506 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -18,6 +18,10 @@
 
 package org.apache.phoenix.end2end;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -30,18 +34,36 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SubstringComparator;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataWriter;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataSupplier;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataWriter;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,16 +74,29 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.COLUMN_TYPES;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.TENANT_VIEW_COLUMNS;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ViewTTLIT extends ParallelStatsDisabledIT {
-
     private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class);
     private static final String ORG_ID_FMT = "00D0x000%s";
     private static final String ID_FMT = "00A0y000%07d";
+    private static final String ZID_FMT = "00B0y000%07d";
     private static final String PHOENIX_TTL_HEADER_SQL = "SELECT PHOENIX_TTL FROM SYSTEM.CATALOG "
             + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = '%s'";
 
@@ -70,6 +105,36 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
     private static final String ALTER_SQL_WITH_NO_TTL
             = "ALTER VIEW \"%s\".\"%s\" ADD IF NOT EXISTS %s CHAR(10)";
+    private static final int DEFAULT_NUM_ROWS = 5;
+
+    private static final String COL1_FMT = "a%05d";
+    private static final String COL2_FMT = "b%05d";
+    private static final String COL3_FMT = "c%05d";
+    private static final String COL4_FMT = "d%05d";
+    private static final String COL5_FMT = "e%05d";
+    private static final String COL6_FMT = "f%05d";
+    private static final String COL7_FMT = "g%05d";
+    private static final String COL8_FMT = "h%05d";
+    private static final String COL9_FMT = "i%05d";
+
+    // Scans the HBase rows directly and asserts
+    private void assertUsingHBaseRows(byte[] hbaseTableName,
+            long minTimestamp, int expectedRows) throws IOException, SQLException {
+
+        try (Table tbl = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+                .getTable(hbaseTableName)) {
+
+            Scan allRows = new Scan();
+            allRows.setTimeRange(minTimestamp, minTimestamp + Integer.MAX_VALUE);
+            ResultScanner scanner = tbl.getScanner(allRows);
+            int numMatchingRows = 0;
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                numMatchingRows++;
+            }
+            assertEquals(String.format("Expected rows do match for table = %s at timestamp %d",
+                    Bytes.toString(hbaseTableName), minTimestamp), expectedRows, numMatchingRows);
+        }
+    }
 
     // Scans the HBase rows directly for the view ttl related header rows column and asserts
     private void assertViewHeaderRowsHavePhoenixTTLRelatedCells(String schemaName,
@@ -227,7 +292,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
      * -----------------
      */
 
-    @Test public void testWithBasicGlobalViewWithNoPhoenixTTLDefined() throws Exception {
+    @Test
+    public void testWithBasicGlobalViewWithNoPhoenixTTLDefined() throws Exception {
 
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
@@ -244,7 +310,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 2);
     }
 
-    @Test public void testPhoenixTTLWithTableLevelTTLFails() throws Exception {
+    @Test
+    public void testPhoenixTTLWithTableLevelTTLFails() throws Exception {
 
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
@@ -266,7 +333,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    @Test public void testPhoenixTTLWithViewIndexFails() throws Exception {
+    @Test
+    public void testPhoenixTTLWithViewIndexFails() throws Exception {
 
         TenantViewIndexOptions tenantViewIndexOptions = TenantViewIndexOptions.withDefaults();
         tenantViewIndexOptions.setIndexProps("PHOENIX_TTL=1000");
@@ -280,7 +348,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    @Test public void testPhoenixTTLForLevelOneView() throws Exception {
+    @Test
+    public void testPhoenixTTLForLevelOneView() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
@@ -308,7 +377,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
     }
 
-    @Test public void testPhoenixTTLForLevelTwoView() throws Exception {
+    @Test
+    public void testPhoenixTTLForLevelTwoView() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         final SchemaBuilder schemaBuilder = createLevel2TenantViewWithGlobalLevelTTL(null, null);
@@ -341,7 +411,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 PTableType.INDEX.getSerializedValue(), 300000);
     }
 
-    @Test public void testPhoenixTTLForWhenTTLIsZero() throws Exception {
+    @Test
+    public void testPhoenixTTLForWhenTTLIsZero() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
@@ -369,7 +440,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
     }
 
-    @Test public void testPhoenixTTLWithAlterView() throws Exception {
+    @Test
+    public void testPhoenixTTLWithAlterView() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
@@ -417,7 +489,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
     }
 
-    @Test public void testCreateViewWithParentPhoenixTTLFails() throws Exception {
+    @Test
+    public void testCreateViewWithParentPhoenixTTLFails() throws Exception {
         try {
             TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults();
             // Phoenix TTL is set to 120s => 120000 ms
@@ -431,7 +504,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    @Test public void testAlterViewWithParentPhoenixTTLFails() throws Exception {
+    @Test
+    public void testAlterViewWithParentPhoenixTTLFails() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         // Phoenix TTL is set to 300s
@@ -479,7 +553,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    @Test public void testAlterViewWithChildLevelPhoenixTTLFails() throws Exception {
+    @Test
+    public void testAlterViewWithChildLevelPhoenixTTLFails() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         // Phoenix TTL is set to 300s
@@ -520,7 +595,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         }
     }
 
-    @Test public void testAlterViewWithNoPhoenixTTLSucceed() throws Exception {
+    @Test
+    public void testAlterViewWithNoPhoenixTTLSucceed() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         // Phoenix TTL is set to 300s
@@ -568,7 +644,8 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
 
     }
 
-    @Test public void testResetPhoenixTTL() throws Exception {
+    @Test
+    public void testResetPhoenixTTL() throws Exception {
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
         final SchemaBuilder schemaBuilder = createLevel2TenantViewWithGlobalLevelTTL(null, null);
@@ -608,4 +685,1916 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         assertSyscatHavePhoenixTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName,
                 PTableType.INDEX.getSerializedValue(), 300000);
     }
+
+
+    @Test
+    public void testWithTenantViewAndNoGlobalView() throws Exception {
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format(ID_FMT, rowIndex);
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter, dataReader,
+                    schemaBuilder);
+        }
+    }
+
+    @Test
+    public void testWithSQLUsingIndexWithCoveredColsUpdates() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+            String col4ForWhereClause;
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format(ID_FMT, rowIndex);
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists
+                        .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+        }
+    }
+
+    /**
+     * Ensure/validate that empty columns for the index are still updated even when covered columns
+     * are not updated.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWithSQLUsingIndexAndNoCoveredColsUpdates() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format(ID_FMT, rowIndex);
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists
+                        .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> nonCoveredColumns =
+                Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+            // Now update the above data but not modifying the covered columns.
+            // Ensure/validate that empty columns for the index are still updated.
+
+            // Data supplier where covered and included (col4 and col6) columns are not updated.
+            DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String id = String.format(ID_FMT, rowIndex);
+                    String zid = String.format(ZID_FMT, rowIndex);
+                    String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+                }
+            };
+
+            // Upsert data for validation with non covered columns
+            dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+            dataWriter.setUpsertColumns(nonCoveredColumns);
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+            dataReader.setValidationColumns(rowKeyColumns1);
+            dataReader.setRowKeyColumns(rowKeyColumns1);
+            dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+        }
+    }
+
+
+    /**
+     * Ensure/validate that correct parent's phoenix ttl value is used when queries are using the
+     * index.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testWithSQLUsingIndexAndMultiLevelViews() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+
+        String level3ViewName = String.format("%s.%s",
+                DEFAULT_SCHEMA_NAME, "E11");
+        String level3ViewCreateSQL = String.format("CREATE VIEW IF NOT EXISTS %s AS SELECT * FROM %s",
+                level3ViewName,
+                schemaBuilder.getEntityTenantViewName());
+        String tConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection tConnection = DriverManager.getConnection(tConnectUrl)) {
+            tConnection.createStatement().execute(level3ViewCreateSQL);
+        }
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format(ID_FMT, rowIndex);
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists
+                        .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> nonCoveredColumns =
+                Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(level3ViewName);
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                    level3ViewName, outerCol4s.get(1)));
+            dataReader.setTargetEntity(level3ViewName);
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+            // Now update the above data but not modifying the covered columns.
+            // Ensure/validate that empty columns for the index are still updated.
+
+            // Data supplier where covered and included (col4 and col6) columns are not updated.
+            DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String id = String.format(ID_FMT, rowIndex);
+                    String zid = String.format(ZID_FMT, rowIndex);
+                    String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 });
+                }
+            };
+
+            // Upsert data for validation with non covered columns
+            dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+            dataWriter.setUpsertColumns(nonCoveredColumns);
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+            dataReader.setValidationColumns(rowKeyColumns1);
+            dataReader.setRowKeyColumns(rowKeyColumns1);
+            dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'",
+                    level3ViewName, outerCol4s.get(1)));
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+        }
+    }
+
+    @Test
+    public void testWithVariousSQLs() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions.setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+        // Define the test data.
+        final String groupById = String.format(ID_FMT, 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { groupById, zid, col4, col5, col6, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String
+                    .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                            schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+            // Case : group by sql
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String
+                    .format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0",
+                            schemaBuilder.getEntityTenantViewName(), groupById));
+
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+        }
+    }
+
+    @Test
+    public void testWithVariousSQLsForMultipleTenants() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int tenant : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+            // build schema for tenant
+            schemaBuilder.buildWithNewTenant();
+
+            // Define the test data.
+            final String groupById = String.format(ID_FMT, 0);
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format(ZID_FMT, rowIndex);
+                    String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { groupById, zid, col4, col5, col6, col7, col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns =
+                    Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+            String tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+            try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String
+                        .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                                schemaBuilder.getEntityTenantViewName()));
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String
+                        .format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0",
+                                schemaBuilder.getEntityTenantViewName()));
+
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+            }
+        }
+    }
+
+    @Test
+    public void testWithVariousSQLsForMultipleViews() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int view : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+            // build schema for new view
+            schemaBuilder.buildNewView();
+
+            // Define the test data.
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format(ZID_FMT, rowIndex);
+                    String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                    String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(new Object[] { zid, col7, col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ZID");
+            String tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+            try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String
+                        .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                                schemaBuilder.getEntityTenantViewName()));
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String
+                        .format("SELECT count(1) as num_rows from %s GROUP BY ZID HAVING count(1) > 0",
+                                schemaBuilder.getEntityTenantViewName()));
+
+                dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+            }
+        }
+    }
+
+    @Test
+    public void testWithTenantViewAndGlobalViewAndVariousOptions() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+
+        // Define the test schema
+        TableOptions tableOptions = TableOptions.withDefaults();
+        String tableProps = "MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='0'";
+        tableOptions.setTableProps(tableProps);
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions = GlobalViewIndexOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS));
+        tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES));
+
+        TenantViewIndexOptions tenantViewIndexOptions = TenantViewIndexOptions.withDefaults();
+        // Test cases :
+        // Local vs Global indexes, Tenant vs Global views, various column family options.
+        for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+            for (boolean isTenantViewLocal : Lists.newArrayList(true, false)) {
+                for (OtherOptions options : getTableAndGlobalAndTenantColumnFamilyOptions()) {
+
+                    /**
+                     * TODO:
+                     * Need to revisit indexing code path,
+                     * as there are some left over rows after delete in these cases.
+                     */
+                    if (!isGlobalViewLocal && !isTenantViewLocal) continue;
+
+                    globalViewIndexOptions.setLocal(isGlobalViewLocal);
+                    tenantViewIndexOptions.setLocal(isTenantViewLocal);
+
+                    final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+                    schemaBuilder.withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withGlobalViewIndexOptions(globalViewIndexOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withTenantViewIndexOptions(tenantViewIndexOptions)
+                            .withOtherOptions(options)
+                            .buildWithNewTenant();
+
+                    // Define the test data.
+                    DataSupplier dataSupplier = new DataSupplier() {
+
+                        @Override public List<Object> getValues(int rowIndex) {
+                            Random rnd = new Random();
+                            String id = String.format(ID_FMT, rowIndex);
+                            String col1 = String.format(COL1_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col2 = String.format(COL2_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col3 = String.format(COL3_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                            String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                            return Lists.newArrayList(
+                                    new Object[] { id, col1, col2, col3, col4, col5, col6,
+                                                   col7, col8, col9 });
+                        }
+                    };
+
+                    long earliestTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+                    // Create a test data reader/writer for the above schema.
+                    DataWriter dataWriter = new BasicDataWriter();
+                    DataReader dataReader = new BasicDataReader();
+
+                    List<String> columns =
+                            Lists.newArrayList("ID",
+                                    "COL1", "COL2", "COL3", "COL4", "COL5",
+                                    "COL6", "COL7", "COL8", "COL9");
+                    List<String> rowKeyColumns = Lists.newArrayList("ID");
+                    String tenantConnectUrl =
+                            getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
+                                    schemaBuilder.getDataOptions().getTenantId();
+                    try (Connection writeConnection = DriverManager
+                            .getConnection(tenantConnectUrl)) {
+                        writeConnection.setAutoCommit(true);
+                        dataWriter.setConnection(writeConnection);
+                        dataWriter.setDataSupplier(dataSupplier);
+                        dataWriter.setUpsertColumns(columns);
+                        dataWriter.setRowKeyColumns(rowKeyColumns);
+                        dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                        dataReader.setValidationColumns(columns);
+                        dataReader.setRowKeyColumns(rowKeyColumns);
+                        dataReader.setDML(String
+                                .format("SELECT %s from %s", Joiner.on(",").join(columns),
+                                        schemaBuilder.getEntityTenantViewName()));
+                        dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                        // Validate data before and after ttl expiration.
+                        upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter,
+                                dataReader, schemaBuilder);
+                    }
+
+                    PTable table = schemaBuilder.getBaseTable();
+                    String schemaName = table.getSchemaName().getString();
+                    String tableName = table.getTableName().getString();
+
+                    long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() +
+                            (phoenixTTL * 1000);
+                    // Delete expired data rows using global connection.
+                    deleteData(true,
+                            null,
+                            schemaBuilder.getEntityGlobalViewName(),
+                            scnTimestamp);
+
+                    if (schemaBuilder.isGlobalViewIndexEnabled() &&
+                            schemaBuilder.isGlobalViewIndexCreated()) {
+                        String viewIndexName = String
+                                .format("%s.IDX_%s",
+                                        schemaName,
+                                        SchemaUtil.getTableNameFromFullName(
+                                                schemaBuilder.getEntityGlobalViewName()));
+                        // Delete expired index rows using global connection.
+                        deleteIndexData(true,
+                                null,
+                                viewIndexName,
+                                scnTimestamp);
+                    }
+
+                    if (schemaBuilder.isTenantViewIndexEnabled() &&
+                            schemaBuilder.isTenantViewIndexCreated()) {
+                        String viewIndexName = String
+                                .format("%s.IDX_%s",
+                                        schemaName,
+                                        SchemaUtil.getTableNameFromFullName(
+                                                schemaBuilder.getEntityTenantViewName()));
+
+                        // Delete expired index rows using tenant connection.
+                        deleteIndexData(false,
+                                schemaBuilder.getDataOptions().getTenantId(),
+                                viewIndexName,
+                                scnTimestamp);
+                    }
+                    // Verify after deleting TTL expired data.
+                    Properties props = new Properties();
+                    props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+
+                    try (Connection readConnection = DriverManager
+                            .getConnection(tenantConnectUrl, props)) {
+
+                        dataReader.setConnection(readConnection);
+                        com.google.common.collect.Table<String, String, Object>
+                                fetchedData =
+                                fetchData(dataReader);
+                        assertTrue("Deleted rows should not be fetched",
+                                fetchedData.rowKeySet().size() == 0);
+                    }
+
+                    byte[] hbaseBaseTableName = SchemaUtil.getTableNameAsBytes(
+                            schemaName,tableName);
+                    String viewIndexSchemaName = String
+                            .format("_IDX_%s", table.getSchemaName().getString());
+                    byte[] hbaseViewIndexTableName =
+                            SchemaUtil.getTableNameAsBytes(viewIndexSchemaName, tableName);
+                    // Validate deletes using hbase
+                    assertUsingHBaseRows(hbaseBaseTableName, earliestTimestamp, 0);
+                    assertUsingHBaseRows(hbaseViewIndexTableName, earliestTimestamp, 0);
+                }
+            }
+        }
+    }
+
+    /**
+     * ************************************************************
+     * Case: Build schema with TTL set by the tenant view.
+     * TTL for GLOBAL_VIEW - 300000ms (not set)
+     * TTL for TENANT_VIEW - 300000ms
+     * ************************************************************
+     */
+
+    @Test
+    public void testGlobalAndTenantViewTTLInheritance1() throws Exception {
+        // PHOENIX TTL is set in seconds (for e.g 200 secs)
+        long tenantPhoenixTTL = 200;
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+
+        SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions();
+        tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewWithOverrideOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+        tenantViewWithOverrideOptions.setTableProps(String.format("PHOENIX_TTL=%d", tenantPhoenixTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        /**
+         * ************************************************************
+         * Build schema with TTL set by the tenant view.
+         * ************************************************************
+         */
+
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults()
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant();
+
+        // Define the test data.
+        final String id = String.format(ID_FMT, 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col1 = String.format(COL1_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col2 = String.format(COL2_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col3 = String.format(COL3_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, col8,
+                                       col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID",
+                        "COL1", "COL2", "COL3", "COL4", "COL5", "COL6",
+                        "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String tenant1ConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenant1ConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String
+                    .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                            schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data exists before ttl expiration.
+            long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis()
+                    + ((tenantPhoenixTTL * 1000) / 2);
+            validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder);
+            // Validate data before and after ttl expiration.
+            // Use the tenant phoenix ttl since that is what the view has set.
+            validateExpiredRowsAreNotReturnedUsingCounts(tenantPhoenixTTL, dataReader, schemaBuilder);
+        }
+    }
+
+    /**
+     * ************************************************************
+     * Case: Build schema with TTL set by the global view.
+     * TTL for GLOBAL_VIEW - 300000ms
+     * TTL for TENANT_VIEW - 300000ms (not set, uses global view ttl)
+     * ************************************************************
+     */
+
+    @Test
+    public void testGlobalAndTenantViewTTLInheritance2() throws Exception {
+        // PHOENIX TTL is set in seconds (for e.g 300 secs)
+        long globalPhoenixTTL = 300;
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", globalPhoenixTTL));
+
+        SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions();
+        tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewWithOverrideOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null));
+
+        /**
+         * ************************************************************
+         * Case: Build schema with TTL set by the global view.
+         * ************************************************************
+         */
+
+        schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults()
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant();
+
+        // Define the test data.
+        final String id = String.format(ID_FMT, 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col1 = String.format(COL1_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col2 = String.format(COL2_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col3 = String.format(COL3_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, col8,
+                                       col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns =
+                Lists.newArrayList("ID", "ZID",
+                        "COL1", "COL2", "COL3", "COL4", "COL5", "COL6",
+                        "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String tenant1ConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenant1ConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String
+                    .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                            schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data exists before ttl expiration.
+            long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis()
+                    + ((globalPhoenixTTL * 1000) / 2);
+            validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder);
+            // Validate data before and after ttl expiration.
+            // Use the global phoenix ttl since that is what the view has inherited.
+            validateExpiredRowsAreNotReturnedUsingCounts(globalPhoenixTTL, dataReader, schemaBuilder);
+        }
+    }
+
+
+    @Test
+    public void testScanAttributes() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        String viewName = schemaBuilder.getEntityTenantViewName();
+
+        Properties props = new Properties();
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+
+        // Test setting masking expired rows property
+        try (Connection conn = DriverManager.getConnection(tenantConnectUrl, props);
+                final Statement statement = conn.createStatement()) {
+            conn.setAutoCommit(true);
+
+            final String stmtString = String.format("select * from  %s", viewName);
+            Preconditions.checkNotNull(stmtString);
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            final QueryPlan queryPlan = pstmt.optimizeQuery(stmtString);
+
+            PTable table = PhoenixRuntime
+                    .getTable(conn, schemaBuilder.getDataOptions().getTenantId(), viewName);
+
+            PhoenixResultSet
+                    rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+            Assert.assertFalse("Should not have any rows", rs.next());
+            Assert.assertEquals("Should have atleast one element", 1, queryPlan.getScans().size());
+            Assert.assertEquals("PhoenixTTL does not match",
+                    phoenixTTL*1000, ScanUtil.getPhoenixTTL(queryPlan.getScans().get(0).get(0)));
+            Assert.assertTrue("Masking attribute should be set",
+                    ScanUtil.isMaskTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
+            Assert.assertFalse("Delete Expired attribute should not set",
+                    ScanUtil.isDeleteTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
+        }
+
+        // Test setting delete expired rows property
+        try (Connection conn = DriverManager.getConnection(tenantConnectUrl, props);
+                final Statement statement = conn.createStatement()) {
+            conn.setAutoCommit(true);
+
+            final String stmtString = String.format("select * from  %s", viewName);
+            Preconditions.checkNotNull(stmtString);
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            final QueryPlan queryPlan = pstmt.optimizeQuery(stmtString);
+            final Scan scan = queryPlan.getContext().getScan();
+
+            PTable table = PhoenixRuntime
+                    .getTable(conn, schemaBuilder.getDataOptions().getTenantId(), viewName);
+
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+
+            PhoenixResultSet
+                    rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+            Assert.assertFalse("Should not have any rows", rs.next());
+            Assert.assertEquals("Should have atleast one element", 1, queryPlan.getScans().size());
+            Assert.assertEquals("PhoenixTTL does not match",
+                    phoenixTTL*1000, ScanUtil.getPhoenixTTL(queryPlan.getScans().get(0).get(0)));
+            Assert.assertFalse("Masking attribute should not be set",
+                    ScanUtil.isMaskTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
+            Assert.assertTrue("Delete Expired attribute should be set",
+                    ScanUtil.isDeleteTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
+        }
+    }
+
+    @Test
+    public void testDeleteWithOnlyTenantView() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        String tableProps = "MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='0'";
+        tableOptions.setTableProps(tableProps);
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .buildWithNewTenant();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format(ZID_FMT, rowIndex);
+                String col1 = String.format(COL1_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col2 = String.format(COL2_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col3 = String.format(COL3_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col1, col2, col3, col7, col8, col9 });
+            }
+        };
+
+        long earliestTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL1", "COL2", "COL3",
+                "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter, dataReader,
+                    schemaBuilder);
+        }
+
+        long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() + (phoenixTTL * 1000);
+        // Delete expired rows using tenant connection.
+        deleteData(false,
+                schemaBuilder.getDataOptions().getTenantId(),
+                schemaBuilder.getEntityTenantViewName(),
+                scnTimestamp);
+
+        // Verify after deleting TTL expired data.
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object>
+                    fetchedData =
+                    fetchData(dataReader);
+            assertEquals("Deleted rows should not be fetched", 0,fetchedData.rowKeySet().size());
+        }
+
+        // Validate deletes using hbase
+        PTable table = schemaBuilder.getBaseTable();
+        String schemaName = table.getSchemaName().getString();
+        String tableName = table.getTableName().getString();
+        byte[] hbaseBaseTableName = SchemaUtil.getTableNameAsBytes(schemaName,tableName);
+        assertUsingHBaseRows(hbaseBaseTableName, earliestTimestamp, 0);
+    }
+
+    @Test
+    public void testDeleteFromMultipleGlobalIndexes() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        String tableProps = "MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='0'";
+        tableOptions.setTableProps(tableProps);
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        GlobalViewIndexOptions globalViewIndexOptions = GlobalViewIndexOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS));
+        tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES));
+
+        // Test cases :
+        // Local vs Global indexes, various column family options.
+        for (boolean isIndex1Local : Lists.newArrayList(true, false)) {
+            for (boolean isIndex2Local : Lists.newArrayList(true, false)) {
+                for (OtherOptions options : getTableAndGlobalAndTenantColumnFamilyOptions()) {
+                    // Define the test schema.
+                    final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+                    schemaBuilder
+                            .withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withGlobalViewIndexOptions(globalViewIndexOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withOtherOptions(options).build();
+
+                    String index1Name;
+                    String index2Name;
+                    try (Connection globalConn = DriverManager.getConnection(getUrl());
+                            final Statement statement = globalConn.createStatement()) {
+
+                        index1Name = String.format("IDX_%s_%s",
+                                schemaBuilder.getEntityGlobalViewName().replaceAll("\\.", "_"),
+                                "COL4");
+
+                        final String index1Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+                                        + "%s ON %s (%s) INCLUDE (%s)", isIndex1Local ? "LOCAL" : "", index1Name,
+                                schemaBuilder.getEntityGlobalViewName(), "COL4", "COL5"
+                        );
+                        statement.execute(index1Str);
+
+                        index2Name = String.format("IDX_%s_%s",
+                                schemaBuilder.getEntityGlobalViewName().replaceAll("\\.", "_"),
+                                "COL5");
+
+                        final String index2Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+                                        + "%s ON %s (%s) INCLUDE (%s)", isIndex2Local ? "LOCAL" : "", index2Name,
+                                schemaBuilder.getEntityGlobalViewName(), "COL5", "COL6"
+                        );
+                        statement.execute(index2Str);
+                    }
+
+                    long earliestTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+                    // Create multiple tenants, add data and run validations
+                    for (int tenant : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+                        // build schema for tenant
+                        schemaBuilder.buildWithNewTenant();
+                        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+
+                        // Define the test data.
+                        DataSupplier dataSupplier = new DataSupplier() {
+
+                            @Override public List<Object> getValues(int rowIndex) {
+                                Random rnd = new Random();
+                                String id = String.format(ID_FMT, rowIndex);
+                                //String zid = String.format(ZID_FMT, rowIndex);
+                                String col4 = String.format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col5 = String.format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col6 = String.format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col7 = String.format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col8 = String.format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col9 = String.format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                                return Lists.newArrayList(
+                                        new Object[] { id, col4, col5, col6, col7, col8, col9 });
+                            }
+                        };
+
+                        // Create a test data reader/writer for the above schema.
+                        DataWriter dataWriter = new BasicDataWriter();
+                        List<String> columns = Lists.newArrayList(
+                                "ID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+                        List<String> rowKeyColumns = Lists.newArrayList("ID");
+                        String tenantConnectUrl =
+                                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+                        try (Connection writeConnection =
+                                DriverManager.getConnection(tenantConnectUrl)) {
+                            writeConnection.setAutoCommit(true);
+                            dataWriter.setConnection(writeConnection);
+                            dataWriter.setDataSupplier(dataSupplier);
+                            dataWriter.setUpsertColumns(columns);
+                            dataWriter.setRowKeyColumns(rowKeyColumns);
+                            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                            // Case : count(1) sql
+                            DataReader dataReader = new BasicDataReader();
+                            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                            dataReader.setDML(String.format(
+                                    "SELECT /* +NO_INDEX */ count(1) as num_rows from %s HAVING count(1) > 0",
+                                            schemaBuilder.getEntityTenantViewName()));
+                            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                            // Validate data before and after ttl expiration.
+                            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder);
+                        }
+                    }
+
+                    PTable table = schemaBuilder.getBaseTable();
+                    String schemaName = table.getSchemaName().getString();
+                    String tableName = table.getTableName().getString();
+                    byte[] hbaseBaseTableName = SchemaUtil.getTableNameAsBytes(schemaName,tableName);
+
+                    // Delete data and index rows
+                    long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() +
+                            (phoenixTTL * 1000);
+
+                    // Delete expired data rows using global connection.
+                    deleteData(true,
+                            null,
+                            schemaBuilder.getEntityGlobalViewName(),
+                            scnTimestamp);
+
+                    String defaultViewIndexName = String
+                            .format("%s.IDX_%s",
+                                    schemaName,
+                                    SchemaUtil.getTableNameFromFullName(
+                                            schemaBuilder.getEntityGlobalViewName()));
+                    // Delete expired index (default) rows using global connection.
+                    deleteIndexData(true,
+                            null,
+                            defaultViewIndexName,
+                            scnTimestamp);
+
+                    // Delete expired index(1) rows using global connection.
+                    String viewIndex1Name = String.format("%s.%s", schemaName, index1Name);
+                    deleteIndexData(true,
+                            null,
+                            viewIndex1Name,
+                            scnTimestamp);
+
+                    // Delete expired index(2) rows using global connection.
+                    String viewIndex2Name = String.format("%s.%s", schemaName, index2Name);
+                    deleteIndexData(true,
+                            null,
+                            viewIndex2Name,
+                            scnTimestamp);
+
+                    String viewIndexSchemaName = String
+                            .format("_IDX_%s", schemaName);
+                    byte[] hbaseViewIndexTableName =
+                            SchemaUtil.getTableNameAsBytes(viewIndexSchemaName, tableName);
+
+                    assertUsingHBaseRows(hbaseBaseTableName, earliestTimestamp, 0);
+                    assertUsingHBaseRows(hbaseViewIndexTableName, earliestTimestamp, 0);
+                }
+            }
+        }
+    }
+
+    @Test public void testDeleteFromMultipleTenantIndexes() throws Exception {
+
+        // PHOENIX TTL is set in seconds (for e.g 10 secs)
+        long phoenixTTL = 10;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        String tableProps = "MULTI_TENANT=true,COLUMN_ENCODED_BYTES=0,DEFAULT_COLUMN_FAMILY='0'";
+        tableOptions.setTableProps(tableProps);
+
+        GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions.setTenantViewColumns(Lists.newArrayList(TENANT_VIEW_COLUMNS));
+        tenantViewOptions.setTenantViewColumnTypes(Lists.newArrayList(COLUMN_TYPES));
+        tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL));
+
+        TenantViewIndexOptions tenantViewIndexOptions = TenantViewIndexOptions.withDefaults();
+
+        // Test cases :
+        // Local vs Global indexes, various column family options.
+        for (boolean isIndex1Local : Lists.newArrayList(true, false)) {
+            for (boolean isIndex2Local : Lists.newArrayList(true, false)) {
+                for (OtherOptions options : getTableAndGlobalAndTenantColumnFamilyOptions()) {
+                    // Define the test schema.
+                    final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+                    schemaBuilder.withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withTenantViewIndexOptions(tenantViewIndexOptions)
+                            .withOtherOptions(options).build();
+
+                    PTable table = schemaBuilder.getBaseTable();
+                    String schemaName = table.getSchemaName().getString();
+                    String tableName = table.getTableName().getString();
+
+                    long earliestTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+                    Map<String, List<String>> mapOfTenantIndexes = Maps.newHashMap();
+                    // Create multiple tenants, add data and run validations
+                    for (int tenant : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+                        // build schema for tenant
+                        schemaBuilder.buildWithNewTenant();
+
+                        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+                        String tenantConnectUrl = getUrl() + ';' + TENANT_ID_ATTRIB + '='
+                                + tenantId;
+                        try (Connection tenantConn = DriverManager.getConnection(tenantConnectUrl);
+                                final Statement statement = tenantConn.createStatement()) {
+                            PhoenixConnection phxConn = tenantConn.unwrap(PhoenixConnection.class);
+
+                            String index1Name = String.format("IDX_%s_%s",
+                                    schemaBuilder.getEntityTenantViewName().replaceAll("\\.", "_"),
+                                    "COL9");
+
+                            final String index1Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+                                            + "%s ON %s (%s) INCLUDE (%s)", isIndex1Local ? "LOCAL" : "",
+                                    index1Name, schemaBuilder.getEntityTenantViewName(), "COL9",
+                                    "COL8");
+                            statement.execute(index1Str);
+
+                            String index2Name = String.format("IDX_%s_%s",
+                                    schemaBuilder.getEntityTenantViewName().replaceAll("\\.", "_"),
+                                    "COL7");
+
+                            final String index2Str = String.format("CREATE %s INDEX IF NOT EXISTS "
+                                            + "%s ON %s (%s) INCLUDE (%s)", isIndex2Local ? "LOCAL" : "",
+                                    index2Name, schemaBuilder.getEntityTenantViewName(), "COL7",
+                                    "COL8");
+                            statement.execute(index2Str);
+
+                            String defaultViewIndexName = String.format("IDX_%s", SchemaUtil
+                                    .getTableNameFromFullName(
+                                            schemaBuilder.getEntityTenantViewName()));
+                            // Collect the indexes for the tenants
+                            mapOfTenantIndexes.put(tenantId,
+                                    Arrays.asList(defaultViewIndexName, index1Name, index2Name));
+                        }
+
+                        // Define the test data.
+                        DataSupplier dataSupplier = new DataSupplier() {
+
+                            @Override public List<Object> getValues(int rowIndex) {
+                                Random rnd = new Random();
+                                String id = String.format(ID_FMT, rowIndex);
+                                String col4 = String
+                                        .format(COL4_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col5 = String
+                                        .format(COL5_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col6 = String
+                                        .format(COL6_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col7 = String
+                                        .format(COL7_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col8 = String
+                                        .format(COL8_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+                                String col9 = String
+                                        .format(COL9_FMT, rowIndex + rnd.nextInt(MAX_ROWS));
+
+                                return Lists.newArrayList(
+                                        new Object[] { id, col4, col5, col6, col7, col8, col9 });
+                            }
+                        };
+
+                        // Create a test data reader/writer for the above schema.
+                        DataWriter dataWriter = new BasicDataWriter();
+                        List<String> columns = Lists
+                                .newArrayList("ID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+                        List<String> rowKeyColumns = Lists.newArrayList("ID");
+                        try (Connection writeConnection = DriverManager
+                                .getConnection(tenantConnectUrl)) {
+                            writeConnection.setAutoCommit(true);
+                            dataWriter.setConnection(writeConnection);
+                            dataWriter.setDataSupplier(dataSupplier);
+                            dataWriter.setUpsertColumns(columns);
+                            dataWriter.setRowKeyColumns(rowKeyColumns);
+                            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                            // Case : count(1) sql
+                            DataReader dataReader = new BasicDataReader();
+                            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                            dataReader.setDML(String
+                                    .format("SELECT /* +NO_INDEX */ count(1) as num_rows from %s HAVING count(1) > 0",
+                                            schemaBuilder.getEntityTenantViewName()));
+                            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                            // Validate data before and after ttl expiration.
+                            validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader,
+                                    schemaBuilder);
+                        }
+                    }
+                    // Delete data and index rows
+                    long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() + (phoenixTTL
+                            * 1000);
+
+                    for (Map.Entry<String, List<String>> entry : mapOfTenantIndexes.entrySet()) {
+                        // Delete expired data rows using tenant connection.
+                        deleteData(false, entry.getKey(), schemaBuilder.getEntityTenantViewName(),
+                                scnTimestamp);
+                        for (String indexName : entry.getValue()) {
+                            // Delete expired index rows using tenant connection.
+                            String viewIndexName = String.format("%s.%s", schemaName, indexName);
+                            deleteIndexData(false, entry.getKey(), viewIndexName, scnTimestamp);
+                        }
+                    }
+
+                    byte[] hbaseBaseTableName = SchemaUtil
+                            .getTableNameAsBytes(schemaName, tableName);
+                    String viewIndexSchemaName = String.format("_IDX_%s", schemaName);
+                    byte[] hbaseViewIndexTableName = SchemaUtil
+                            .getTableNameAsBytes(viewIndexSchemaName, tableName);
+
+                    assertUsingHBaseRows(hbaseBaseTableName, earliestTimestamp, 0);
+                    assertUsingHBaseRows(hbaseViewIndexTableName, earliestTimestamp, 0);
+                }
+            }
+        }
+    }
+
+    private void upsertDataAndRunValidations(long phoenixTTL, int numRowsToUpsert,
+            DataWriter dataWriter, DataReader dataReader, SchemaBuilder schemaBuilder)
+            throws Exception {
+
+        //Insert for the first time and validate them.
+        validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, upsertData(dataWriter, numRowsToUpsert),
+                dataReader, schemaBuilder);
+
+        // Update the above rows and validate the same.
+        validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, upsertData(dataWriter, numRowsToUpsert),
+                dataReader, schemaBuilder);
+
+    }
+
+    private void validateExpiredRowsAreNotReturnedUsingCounts(long phoenixTTL, DataReader dataReader,
+            SchemaBuilder schemaBuilder) throws SQLException {
+
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+
+        // Verify before TTL expiration
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData
+                    = fetchData(dataReader);
+            assertNotNull("Fetched data should not be null", fetchedData);
+            assertTrue("Rows should exists before expiration",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+
+        // Verify after TTL expiration
+        long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 * phoenixTTL * 1000)));
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertNotNull("Fetched data should not be null", fetchedData);
+            assertEquals("Expired rows should not be fetched", 0,
+                    fetchedData.rowKeySet().size());
+        }
+    }
+
+    private void validateExpiredRowsAreNotReturnedUsingData(long phoenixTTL,
+            com.google.common.collect.Table<String, String, Object> upsertedData,
+            DataReader dataReader, SchemaBuilder schemaBuilder) throws SQLException {
+
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId();
+
+        // Verify before TTL expiration
+        Properties props = new Properties();
+        long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object>
+                    fetchedData =
+                    fetchData(dataReader);
+            assertNotNull("Upserted data should not be null", upsertedData);
+            assertNotNull("Fetched data should not be null", fetchedData);
+
+            verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+        }
+
+        // Verify after TTL expiration
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 * phoenixTTL * 1000)));
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object>
+                    fetchedData =
+                    fetchData(dataReader);
+            assertNotNull("Fetched data should not be null", fetchedData);
+            assertEquals("Expired rows should not be fetched", 0, fetchedData.rowKeySet().size());
+        }
+
+    }
+
+    private void validateRowsAreNotMaskedUsingCounts(long probeTimestamp, DataReader dataReader,
+            SchemaBuilder schemaBuilder) throws SQLException {
+
+        String tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+
+        // Verify rows exists (not masked) at current time
+        long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp ));
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object>
+                    fetchedData =
+                    fetchData(dataReader);
+            assertNotNull("Fetched data should not be null", fetchedData);
+            assertTrue("Rows should exists before ttl expiration (now)",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+
+        // Verify rows exists (not masked) at probed timestamp
+        props.setProperty("CurrentSCN", Long.toString(probeTimestamp));
+        try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object>
+                    fetchedData =
+                    fetchData(dataReader);
+            assertNotNull("Fetched data should not be null", fetchedData);
+            assertTrue("Rows should exists before ttl expiration (probe-timestamp)",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+    }
+
+    private void verifyRowsBeforeTTLExpiration(
+            com.google.common.collect.Table<String, String, Object> upsertedData,
+            com.google.common.collect.Table<String, String, Object> fetchedData) {
+
+        Set<String> upsertedRowKeys = upsertedData.rowKeySet();
+        Set<String> fetchedRowKeys = fetchedData.rowKeySet();
+        assertNotNull("Upserted row keys should not be null", upsertedRowKeys);
+        assertNotNull("Fetched row keys should not be null", fetchedRowKeys);
+        assertEquals(String.format("Rows upserted and fetched do not match, upserted=%d, fetched=%d",
+                upsertedRowKeys.size(), fetchedRowKeys.size()),
+                upsertedRowKeys, fetchedRowKeys);
+
+        Set<String> fetchedCols = fetchedData.columnKeySet();
+        for (String rowKey : fetchedRowKeys) {
+            for (String columnKey : fetchedCols) {
+                Object upsertedValue = upsertedData.get(rowKey, columnKey);
+                Object fetchedValue = fetchedData.get(rowKey, columnKey);
+                assertNotNull("Upserted values should not be null", upsertedValue);
+                assertNotNull("Fetched values should not be null", fetchedValue);
+                assertEquals("Values upserted and fetched do not match",
+                        upsertedValue, fetchedValue);
+            }
+        }
+    }
+
+    private com.google.common.collect.Table<String, String, Object> upsertData(
+            DataWriter dataWriter, int numRowsToUpsert) throws Exception {
+        // Upsert rows
+        dataWriter.upsertRows(1, numRowsToUpsert);
+        return dataWriter.getDataTable();
+    }
+
+    private com.google.common.collect.Table<String, String, Object> fetchData(DataReader dataReader)
+            throws SQLException {
+
+        dataReader.readRows();
+        return dataReader.getDataTable();
+    }
+
+    private void deleteData(
+            boolean useGlobalConnection,
+            String tenantId,
+            String viewName,
+            long scnTimestamp) throws SQLException {
+
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        String connectUrl = useGlobalConnection ?
+                getUrl() :
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+
+        try (Connection deleteConnection = DriverManager.getConnection(connectUrl, props);
+                final Statement statement = deleteConnection.createStatement()) {
+            deleteConnection.setAutoCommit(true);
+
+            final String deleteIfExpiredStatement = String.format("select * from  %s", viewName);
+            Preconditions.checkNotNull(deleteIfExpiredStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+
+            PTable table = PhoenixRuntime.getTable(deleteConnection, tenantId, viewName);
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+            PhoenixResultSet
+                    rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+            while (rs.next());
+        }
+    }
+
+    private void deleteIndexData(boolean useGlobalConnection,
+            String tenantId,
+            String indexName,
+            long scnTimestamp) throws SQLException {
+
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        String connectUrl = useGlobalConnection ?
+                getUrl() :
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+
+        try (Connection deleteConnection = DriverManager.getConnection(connectUrl, props);
+                final Statement statement = deleteConnection.createStatement()) {
+            deleteConnection.setAutoCommit(true);
+
+            final String deleteIfExpiredStatement = String.format("select * from %s", indexName);
+            Preconditions.checkNotNull(deleteIfExpiredStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+
+            PTable table = PhoenixRuntime.getTable(deleteConnection, tenantId, indexName);
+
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+            PhoenixResultSet
+                    rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+            while (rs.next());
+        }
+    }
+
+    private List<OtherOptions> getTableAndGlobalAndTenantColumnFamilyOptions() {
+
+        List<OtherOptions> testCases = Lists.newArrayList();
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTenantViewCFs(Lists.newArrayList((String) null, null, null));
+        testCases.add(testCaseWhenAllCFMatchAndAllDefault);
+
+        OtherOptions testCaseWhenAllCFMatchAndSame = new OtherOptions();
+        testCaseWhenAllCFMatchAndSame.setTestName("testCaseWhenAllCFMatchAndSame");
+        testCaseWhenAllCFMatchAndSame.setTableCFs(Lists.newArrayList("A", "A", "A"));
+        testCaseWhenAllCFMatchAndSame.setGlobalViewCFs(Lists.newArrayList("A", "A", "A"));
+        testCaseWhenAllCFMatchAndSame.setTenantViewCFs(Lists.newArrayList("A", "A", "A"));
+        testCases.add(testCaseWhenAllCFMatchAndSame);
+
+        OtherOptions testCaseWhenAllCFMatch = new OtherOptions();
+        testCaseWhenAllCFMatch.setTestName("testCaseWhenAllCFMatch");
+        testCaseWhenAllCFMatch.setTableCFs(Lists.newArrayList(null, "A", "B"));
+        testCaseWhenAllCFMatch.setGlobalViewCFs(Lists.newArrayList(null, "A", "B"));
+        testCaseWhenAllCFMatch.setTenantViewCFs(Lists.newArrayList(null, "A", "B"));
+        testCases.add(testCaseWhenAllCFMatch);
+
+        OtherOptions testCaseWhenTableCFsAreDiff = new OtherOptions();
+        testCaseWhenTableCFsAreDiff.setTestName("testCaseWhenTableCFsAreDiff");
+        testCaseWhenTableCFsAreDiff.setTableCFs(Lists.newArrayList(null, "A", "B"));
+        testCaseWhenTableCFsAreDiff.setGlobalViewCFs(Lists.newArrayList("A", "A", "B"));
+        testCaseWhenTableCFsAreDiff.setTenantViewCFs(Lists.newArrayList("A", "A", "B"));
+        testCases.add(testCaseWhenTableCFsAreDiff);
+
+        OtherOptions testCaseWhenGlobalAndTenantCFsAreDiff = new OtherOptions();
+        testCaseWhenGlobalAndTenantCFsAreDiff.setTestName("testCaseWhenGlobalAndTenantCFsAreDiff");
+        testCaseWhenGlobalAndTenantCFsAreDiff.setTableCFs(Lists.newArrayList(null, "A", "B"));
+        testCaseWhenGlobalAndTenantCFsAreDiff.setGlobalViewCFs(Lists.newArrayList("A", "A", "A"));
+        testCaseWhenGlobalAndTenantCFsAreDiff.setTenantViewCFs(Lists.newArrayList("B", "B", "B"));
+        testCases.add(testCaseWhenGlobalAndTenantCFsAreDiff);
+
+        return testCases;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 0359bfa..007bb23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -37,14 +37,13 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 
 import java.sql.SQLException;
 import java.util.Collections;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
-import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
+import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
 
 
 /**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 0b97e52..a3857c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -62,7 +62,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 
-
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class);
 
@@ -120,6 +119,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync";
     public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
     public static final String TX_SCN = "_TxScn";
+    public static final String PHOENIX_TTL = "_PhoenixTTL";
+    public static final String MASK_PHOENIX_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
+    public static final String DELETE_PHOENIX_TTL_EXPIRED = "_DELETE_TTL_EXPIRED";
+    public static final String PHOENIX_TTL_SCAN_TABLE_NAME = "_PhoenixTTLScanTableName";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
     public final static String SCAN_OFFSET = "_RowOffset";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
new file mode 100644
index 0000000..f6c1b96
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixTTLSource;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ * Coprocessor that checks whether the row is expired based on the TTL spec.
+ */
+public class PhoenixTTLRegionObserver extends BaseRegionObserver {
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixTTLRegionObserver.class);
+    private MetricsPhoenixTTLSource metricSource;
+
+    @Override public void start(CoprocessorEnvironment e) throws IOException {
+        super.start(e);
+        metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource();
+    }
+
+    @Override public void stop(CoprocessorEnvironment e) throws IOException {
+        super.stop(e);
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
+            RegionScanner s) throws IOException {
+
+        if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) {
+            return s;
+        } else if (ScanUtil.isMaskTTLExpiredRows(scan) && ScanUtil.isDeleteTTLExpiredRows(scan)) {
+            throw new IOException("Both mask and delete expired rows property cannot be set");
+        } else if (ScanUtil.isMaskTTLExpiredRows(scan)) {
+            metricSource.incrementMaskExpiredRequestCount();
+            scan.setAttribute(PhoenixTTLRegionScanner.MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR,
+                    Bytes.toBytes(String.format("MASK-EXPIRED-%d",
+                            metricSource.getMaskExpiredRequestCount())));
+        } else if (ScanUtil.isDeleteTTLExpiredRows(scan)) {
+            metricSource.incrementDeleteExpiredRequestCount();
+            scan.setAttribute(PhoenixTTLRegionScanner.MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR,
+                    Bytes.toBytes(String.format("DELETE-EXPIRED-%d",
+                            metricSource.getDeleteExpiredRequestCount())));
+        }
+        LOG.trace(String.format(
+                "********** PHOENIX-TTL: PhoenixTTLRegionObserver::postScannerOpen TTL for table = "
+                        + "[%s], scan = [%s], PHOENIX_TTL = %d ***************, "
+                        + "numMaskExpiredRequestCount=%d, "
+                        + "numDeleteExpiredRequestCount=%d",
+                s.getRegionInfo().getTable().getNameAsString(),
+                scan.toJSON(Integer.MAX_VALUE),
+                ScanUtil.getPhoenixTTL(scan),
+                metricSource.getMaskExpiredRequestCount(),
+                metricSource.getDeleteExpiredRequestCount()
+        ));
+        return new PhoenixTTLRegionScanner(c.getEnvironment(), scan, s);
+    }
+
+    /**
+     * A region scanner that checks the TTL expiration of rows
+     */
+    private static class PhoenixTTLRegionScanner implements RegionScanner {
+        private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR =
+                "MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID";
+
+        private final RegionScanner scanner;
+        private final Scan scan;
+        private final byte[] emptyCF;
+        private final byte[] emptyCQ;
+        private final Region region;
+        private final long minTimestamp;
+        private final long maxTimestamp;
+        private final long now;
+        private final boolean deleteIfExpired;
+        private final boolean maskIfExpired;
+        private final String requestId;
+        private final byte[] scanTableName;
+        private long numRowsExpired;
+        private long numRowsScanned;
+        private long numRowsDeleted;
+        private boolean reported = false;
+
+        public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan scan,
+                RegionScanner scanner) throws IOException {
+            this.scan = scan;
+            this.scanner = scanner;
+            byte[] requestIdBytes = scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR);
+            this.requestId = Bytes.toString(requestIdBytes);
+
+            deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan);
+            maskIfExpired = !deleteIfExpired && ScanUtil.isMaskTTLExpiredRows(scan);
+
+            region = env.getRegion();
+            emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+            emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+            scanTableName = scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME);
+
+            byte[] txnScn = scan.getAttribute(BaseScannerRegionObserver.TX_SCN);
+            if (txnScn != null) {
+                TimeRange timeRange = scan.getTimeRange();
+                scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+            }
+            minTimestamp = scan.getTimeRange().getMin();
+            maxTimestamp = scan.getTimeRange().getMax();
+            now = maxTimestamp != HConstants.LATEST_TIMESTAMP ?
+                            maxTimestamp :
+                            EnvironmentEdgeManager.currentTimeMillis();
+        }
+
+        @Override public int getBatch() {
+            return scanner.getBatch();
+        }
+
+        @Override public long getMaxResultSize() {
+            return scanner.getMaxResultSize();
+        }
+
+        @Override public boolean next(List<Cell> result) throws IOException {
+            return doNext(result, false);
+        }
+
+        @Override public boolean next(List<Cell> result, ScannerContext scannerContext)
+                throws IOException {
+            throw new IOException(
+                    "next with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
+                throws IOException {
+            throw new IOException(
+                    "NextRaw with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override public void close() throws IOException {
+            if (!reported) {
+                LOG.debug(String.format(
+                        "PHOENIX-TTL-SCAN-STATS-ON-CLOSE: " + "request-id:[%s,%s] = [%d, %d, %d]",
+                        this.requestId, Bytes.toString(scanTableName),
+                        this.numRowsScanned, this.numRowsExpired, this.numRowsDeleted));
+                reported = true;
+            }
+            scanner.close();
+        }
+
+        @Override public HRegionInfo getRegionInfo() {
+            return scanner.getRegionInfo();
+        }
+
+        @Override public boolean isFilterDone() throws IOException {
+            return scanner.isFilterDone();
+        }
+
+        @Override public boolean reseek(byte[] row) throws IOException {
+            return scanner.reseek(row);
+        }
+
+        @Override public long getMvccReadPoint() {
+            return scanner.getMvccReadPoint();
+        }
+
+        @Override public boolean nextRaw(List<Cell> result) throws IOException {
+            return doNext(result, true);
+        }
+
+        private boolean doNext(List<Cell> result, boolean raw) throws IOException {
+            try {
+                boolean hasMore;
+                do {
+                    hasMore = raw ? scanner.nextRaw(result) : scanner.next(result);
+                    if (result.isEmpty()) {
+                        break;
+                    }
+                    numRowsScanned++;
+                    if (maskIfExpired && checkRowNotExpired(result)) {
+                        break;
+                    }
+
+                    if (deleteIfExpired && deleteRowIfExpired(result)) {
+                        numRowsDeleted++;
+                        break;
+                    }
+                    // skip this row
+                    // 1. if the row has expired (checkRowNotExpired returned false)
+                    // 2. if the row was not deleted (deleteRowIfExpired returned false and
+                    //  do not want it to count towards the deleted count)
+                    if (maskIfExpired) {
+                        numRowsExpired++;
+                    }
+                    result.clear();
+                } while (hasMore);
+                return hasMore;
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                return false; // impossible
+            }
+        }
+
+        /**
+         * @param cellList is an input and output parameter and will either include a valid row or be an empty list
+         * @return true if row expired and deleted or empty, otherwise false
+         * @throws IOException
+         */
+        private boolean deleteRowIfExpired(List<Cell> cellList) throws IOException {
+
+            long cellListSize = cellList.size();
+            if (cellListSize == 0) {
+                return true;
+            }
+
+            Iterator<Cell> cellIterator = cellList.iterator();
+            Cell firstCell = cellIterator.next();
+            byte[] rowKey = new byte[firstCell.getRowLength()];
+            System.arraycopy(firstCell.getRowArray(), firstCell.getRowOffset(), rowKey, 0,
+                    firstCell.getRowLength());
+
+            boolean isRowExpired = !checkRowNotExpired(cellList);
+            if (isRowExpired) {
+                long ttl = ScanUtil.getPhoenixTTL(this.scan);
+                long ts = ScanUtil.getMaxTimestamp(cellList);
+                LOG.trace(String.format(
+                        "PHOENIX-TTL: Deleting row = [%s] belonging to table = %s, "
+                                + "scn = %s, now = %d, delete-ts = %d, max-ts = %d",
+                        Bytes.toString(rowKey),
+                        Bytes.toString(scanTableName),
+                        maxTimestamp != HConstants.LATEST_TIMESTAMP,
+                        now, now - ttl, ts));
+                Delete del = new Delete(rowKey, now - ttl);
+                Mutation[] mutations = new Mutation[] { del };
+                region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+                return true;
+            }
+            return false;
+        }
+
+        /**
+         * @param cellList is an input and output parameter and will either include a valid row
+         *                 or be an empty list
+         * @return true if row not expired, otherwise false
+         * @throws IOException
+         */
+        private boolean checkRowNotExpired(List<Cell> cellList) throws IOException {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (ScanUtil.isEmptyColumn(cell, this.emptyCF, this.emptyCQ)) {
+                    LOG.trace(String.format("** PHOENIX-TTL: Row expired for [%s], expired = %s **",
+                            cell.toString(), ScanUtil.isTTLExpired(cell, this.scan, this.now)));
+                    // Empty column is not supposed to be returned to the client
+                    // except when it is the only column included in the scan.
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return !ScanUtil.isTTLExpired(cell, this.scan, this.now);
+                }
+            }
+            LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo()
+                    .getTable().getNameAsString());
+            return true;
+        }
+
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java
new file mode 100644
index 0000000..47ee949
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor.metrics;
+/**
+ * Factory object to create various metric sources for phoenix related coprocessors.
+ */
+public class MetricsPhoenixCoprocessorSourceFactory {
+
+    private static final MetricsPhoenixCoprocessorSourceFactory
+            INSTANCE = new MetricsPhoenixCoprocessorSourceFactory();
+    // Holds the PHOENIX_TTL related metrics.
+    private static volatile MetricsPhoenixTTLSource phoenixTTLSource;
+
+    public static MetricsPhoenixCoprocessorSourceFactory getInstance() {
+        return INSTANCE;
+    }
+
+    // return the metric source for PHOENIX_TTL coproc.
+    public MetricsPhoenixTTLSource getPhoenixTTLSource() {
+        if (INSTANCE.phoenixTTLSource == null) {
+            synchronized (MetricsPhoenixTTLSource.class) {
+                if (INSTANCE.phoenixTTLSource == null) {
+                    INSTANCE.phoenixTTLSource = new MetricsPhoenixTTLSourceImpl();
+                }
+            }
+        }
+        return INSTANCE.phoenixTTLSource;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSource.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSource.java
new file mode 100644
index 0000000..44c0633
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.phoenix.hbase.index.Indexer;
+
+/**
+ * Interface for metrics about {@link org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver}.
+ */
+public interface MetricsPhoenixTTLSource extends BaseSource {
+
+    // Metrics2 and JMX constants
+    String METRICS_NAME = "PhoenixTTLProcessor";
+    String METRICS_CONTEXT = "phoenix";
+    String METRICS_DESCRIPTION = "Metrics about the Phoenix TTL Coprocessor";
+    String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+
+    String PHOENIX_TTL_MASK_EXPIRED_REQUESTS = "phoenixMaskTTLExpiredRequests";
+    String PHOENIX_TTL_MASK_EXPIRED_REQUESTS_DESC = "The number of scan requests to mask PHOENIX TTL expired rows";
+
+    String PHOENIX_TTL_DELETE_EXPIRED_REQUESTS = "phoenixDeleteTTLExpiredRequests";
+    String PHOENIX_TTL_DELETE_EXPIRED_REQUESTS_DESC = "The number of delete requests to delete PHOENIX TTL expired rows";
+
+    /**
+     * Report the number of requests to mask TTL expired rows.
+     */
+    long getMaskExpiredRequestCount();
+    /**
+     * Keeps track of the number of requests to mask TTL expired rows.
+     */
+    void incrementMaskExpiredRequestCount();
+
+    /**
+     * Report the number of requests to mask TTL expired rows.
+     */
+    long getDeleteExpiredRequestCount();
+    /**
+     * Keeps track of the number of requests to delete TTL expired rows.
+     */
+    void incrementDeleteExpiredRequestCount();
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSourceImpl.java
new file mode 100644
index 0000000..13f1332
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixTTLSourceImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.coprocessor.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+
+/**
+ * Implementation for tracking {@link org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver} metrics.
+ */
+public class MetricsPhoenixTTLSourceImpl extends BaseSourceImpl implements MetricsPhoenixTTLSource {
+    private final MutableFastCounter maskExpiredRequests;
+    private final MutableFastCounter deleteExpiredRequests;
+
+    public MetricsPhoenixTTLSourceImpl() {
+        this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+    }
+
+    public MetricsPhoenixTTLSourceImpl(String metricsName, String metricsDescription,
+            String metricsContext, String metricsJmxContext) {
+        super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+        maskExpiredRequests = getMetricsRegistry().newCounter(PHOENIX_TTL_MASK_EXPIRED_REQUESTS,
+                PHOENIX_TTL_MASK_EXPIRED_REQUESTS_DESC, 0L);
+
+        deleteExpiredRequests = getMetricsRegistry().newCounter(PHOENIX_TTL_DELETE_EXPIRED_REQUESTS,
+                PHOENIX_TTL_DELETE_EXPIRED_REQUESTS_DESC, 0L);
+
+    }
+
+    @Override public void incrementMaskExpiredRequestCount() {
+        maskExpiredRequests.incr();
+    }
+
+    @Override public long getMaskExpiredRequestCount() {
+        return maskExpiredRequests.value();
+    }
+
+    @Override public long getDeleteExpiredRequestCount() { return deleteExpiredRequests.value(); }
+
+    @Override public void incrementDeleteExpiredRequestCount() { deleteExpiredRequests.incr(); }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 64571af..c576ad8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -58,7 +58,6 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -136,7 +135,8 @@ public class TableResultIterator implements ResultIterator {
         this.caches = caches;
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
-        IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+        ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+        ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection());
         scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 55bb9fe..d76c647 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -167,6 +167,7 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.PhoenixTTLRegionObserver;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.ChildLinkMetaDataService;
@@ -1124,6 +1125,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     }
                 }
             }
+
+            // The priority for this co-processor should be set higher than the GlobalIndexChecker so that the read repair scans
+            // are intercepted by the TTLAwareRegionObserver and only the rows that are not ttl-expired are returned.
+            if (!SchemaUtil.isSystemTable(tableName)) {
+                if (!descriptor.hasCoprocessor(PhoenixTTLRegionObserver.class.getName())) {
+                    descriptor.addCoprocessor(
+                            PhoenixTTLRegionObserver.class.getName(), null, priority-2, null);
+                }
+            }
+
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index f8f9bed..748c423 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -332,6 +332,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
     // The number of rows to be scanned in one RPC call
     public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
+    // Flag indicating that server side masking of ttl expired rows is enabled.
+    public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled";
 
 
     // Before 4.15 when we created a view we included the parent table column metadata in the view
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index aa9cfe3..c36361d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -97,10 +97,9 @@ import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION;
 import static org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED;
 
-import java.util.HashSet;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -159,6 +158,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000;
     public static final int DEFAULT_MAX_INDEXES_PER_TABLE = 10;
     public static final int DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD = 0;
+    public static final boolean DEFAULT_SERVER_SIDE_MASKING_ENABLED = true;
 
     public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE
     //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
@@ -457,6 +457,7 @@ public class QueryServicesOptions {
             .setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, Boolean.FALSE.toString())
             .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
             .setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
+            .setIfUnset(PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, DEFAULT_SERVER_SIDE_MASKING_ENABLED)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 85a4bf3..aa989fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -21,11 +21,9 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.PhoenixRuntime.getTable;
 
 import java.io.ByteArrayInputStream;
@@ -39,7 +37,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.cache.Cache;
@@ -60,10 +57,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -85,7 +78,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
@@ -93,9 +85,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
-import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -103,7 +92,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -143,6 +131,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import com.google.common.collect.Lists;
 
 public class IndexUtil {
+
     public static final String INDEX_COLUMN_NAME_SEP = ":";
     public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
 
@@ -921,124 +910,4 @@ public class IndexUtil {
             throw new IOException(e);
         }
     }
-
-    private static boolean containsOneOrMoreColumn(Scan scan) {
-        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-        if (familyMap == null || familyMap.isEmpty()) {
-            return false;
-        }
-        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-            NavigableSet<byte[]> family = entry.getValue();
-            if (family != null && !family.isEmpty()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private static boolean addEmptyColumnToFilter(Scan scan, byte[] emptyCF, byte[] emptyCQ, Filter filter,  boolean addedEmptyColumn) {
-        if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
-            ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
-            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                scan.addColumn(emptyCF, emptyCQ);
-                return true;
-            }
-        }
-        else if (filter instanceof ColumnProjectionFilter) {
-            ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
-            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                scan.addColumn(emptyCF, emptyCQ);
-                return true;
-            }
-        }
-        else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
-            ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
-        }
-        else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
-            scan.addColumn(emptyCF, emptyCQ);
-            return true;
-        }
-        return addedEmptyColumn;
-    }
-
-    private static boolean addEmptyColumnToFilterList(Scan scan, byte[] emptyCF, byte[] emptyCQ, FilterList filterList, boolean addedEmptyColumn) {
-        Iterator<Filter> filterIterator = filterList.getFilters().iterator();
-        while (filterIterator.hasNext()) {
-            Filter filter = filterIterator.next();
-            if (filter instanceof FilterList) {
-                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) {
-                    addedEmptyColumn =  true;
-                }
-            } else {
-                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) {
-                    addedEmptyColumn =  true;
-                }
-            }
-        }
-        return addedEmptyColumn;
-    }
-
-    public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
-        boolean addedEmptyColumn = false;
-        Filter filter = scan.getFilter();
-        if (filter != null) {
-            if (filter instanceof FilterList) {
-                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) {
-                    addedEmptyColumn = true;
-                }
-            } else {
-                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) {
-                    addedEmptyColumn = true;
-                }
-            }
-        }
-        if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-            scan.addColumn(emptyCF, emptyCQ);
-        }
-    }
-
-    public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
-        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
-            return;
-        }
-        PTable indexTable = table;
-        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
-            return;
-        }
-        String schemaName = indexTable.getParentSchemaName().getString();
-        String tableName = indexTable.getParentTableName().getString();
-        PTable dataTable;
-        try {
-            dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
-        } catch (TableNotFoundException e) {
-            // This index table must be being deleted. No need to set the scan attributes
-            return;
-        }
-        // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
-        // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
-        if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
-            int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
-            String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
-            indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
-        }
-        if (!dataTable.getIndexes().contains(indexTable)) {
-            return;
-        }
-        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
-            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
-        }
-        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
-        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
-        IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
-        byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
-        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
-        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
-            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
-        }
-        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
-    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index be63fae..fe8c06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -17,30 +17,16 @@
  */
 package org.apache.phoenix.util;
 
-import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
-import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeMap;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -53,13 +39,19 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
@@ -70,14 +62,35 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 /**
  * 
@@ -87,6 +100,7 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class ScanUtil {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScanUtil.class);
     public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
     public static final int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0);
 
@@ -957,4 +971,238 @@ public class ScanUtil {
     public static void setClientVersion(Scan scan, int version) {
         scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version));
     }
+
+    public static boolean isServerSideMaskingEnabled(PhoenixConnection phoenixConnection) {
+        String isServerSideMaskingSet = phoenixConnection.getClientInfo(
+                QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED);
+        return (phoenixConnection.getQueryServices()
+                .getConfiguration().getBoolean(
+                        QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED,
+                        QueryServicesOptions.DEFAULT_SERVER_SIDE_MASKING_ENABLED) ||
+                ((isServerSideMaskingSet != null) && (Boolean.parseBoolean(isServerSideMaskingSet))));
+    }
+
+    public static long getPhoenixTTL(Scan scan) {
+        byte[] phoenixTTL = scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL);
+        if (phoenixTTL == null) {
+            return 0L;
+        }
+        return Bytes.toLong(phoenixTTL);
+    }
+
+    public static boolean isMaskTTLExpiredRows(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED) != null &&
+                (Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED),
+                        PDataType.TRUE_BYTES) == 0)
+                && scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL) != null;
+    }
+
+    public static boolean isDeleteTTLExpiredRows(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED) != null && (
+                Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED),
+                        PDataType.TRUE_BYTES) == 0)
+                && scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL) != null;
+    }
+
+    public static boolean isEmptyColumn(Cell cell, byte[] emptyCF, byte[] emptyCQ) {
+        return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(),
+                cell.getFamilyLength(), emptyCF, 0, emptyCF.length) == 0 &&
+                Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
+                        cell.getQualifierLength(), emptyCQ, 0, emptyCQ.length) == 0;
+    }
+
+    public static long getMaxTimestamp(List<Cell> cellList) {
+        long maxTs = 0;
+        long ts = 0;
+        Iterator<Cell> cellIterator = cellList.iterator();
+        while (cellIterator.hasNext()) {
+            Cell cell = cellIterator.next();
+            ts = cell.getTimestamp();
+            if (ts > maxTs) {
+                maxTs = ts;
+            }
+        }
+        return maxTs;
+    }
+
+    public static boolean isTTLExpired(Cell cell, Scan scan, long nowTS) {
+        long ts = cell.getTimestamp();
+        long ttl = ScanUtil.getPhoenixTTL(scan);
+        return ts + ttl < nowTS;
+    }
+
+    private static boolean containsOneOrMoreColumn(Scan scan) {
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap == null || familyMap.isEmpty()) {
+            return false;
+        }
+        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            NavigableSet<byte[]> family = entry.getValue();
+            if (family != null && !family.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean addEmptyColumnToFilter(Scan scan, byte[] emptyCF, byte[] emptyCQ, Filter filter,  boolean addedEmptyColumn) {
+        if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+            ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                scan.addColumn(emptyCF, emptyCQ);
+                return true;
+            }
+        }
+        else if (filter instanceof ColumnProjectionFilter) {
+            ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
+            if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                scan.addColumn(emptyCF, emptyCQ);
+                return true;
+            }
+        }
+        else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
+            ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+        }
+        else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
+            scan.addColumn(emptyCF, emptyCQ);
+            return true;
+        }
+        return addedEmptyColumn;
+    }
+
+    private static boolean addEmptyColumnToFilterList(Scan scan, byte[] emptyCF, byte[] emptyCQ, FilterList filterList, boolean addedEmptyColumn) {
+        Iterator<Filter> filterIterator = filterList.getFilters().iterator();
+        while (filterIterator.hasNext()) {
+            Filter filter = filterIterator.next();
+            if (filter instanceof FilterList) {
+                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) {
+                    addedEmptyColumn =  true;
+                }
+            } else {
+                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) {
+                    addedEmptyColumn =  true;
+                }
+            }
+        }
+        return addedEmptyColumn;
+    }
+
+    public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+        boolean addedEmptyColumn = false;
+        Filter filter = scan.getFilter();
+        if (filter != null) {
+            if (filter instanceof FilterList) {
+                if (addEmptyColumnToFilterList(scan, emptyCF, emptyCQ, (FilterList) filter, addedEmptyColumn)) {
+                    addedEmptyColumn = true;
+                }
+            } else {
+                if (addEmptyColumnToFilter(scan, emptyCF, emptyCQ, filter, addedEmptyColumn)) {
+                    addedEmptyColumn = true;
+                }
+            }
+        }
+        if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+            scan.addColumn(emptyCF, emptyCQ);
+        }
+    }
+
+    public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
+        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
+            return;
+        }
+        PTable indexTable = table;
+        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+            return;
+        }
+        String schemaName = indexTable.getParentSchemaName().getString();
+        String tableName = indexTable.getParentTableName().getString();
+        PTable dataTable;
+        try {
+            dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
+        } catch (TableNotFoundException e) {
+            // This index table must be being deleted. No need to set the scan attributes
+            return;
+        }
+        // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
+        // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
+        if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+            int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+            String indexName = indexTable.getName().getString().substring(lastIndexOf + 1);
+            indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
+        }
+        if (!dataTable.getIndexes().contains(indexTable)) {
+            return;
+        }
+        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
+            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+        }
+        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
+        IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+        byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
+            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+        }
+        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+    }
+
+    public static void setScanAttributesForPhoenixTTL(Scan scan, PTable table,
+            PhoenixConnection phoenixConnection) throws SQLException {
+
+        // If server side masking for PHOENIX_TTL is not enabled OR is a SYSTEM table then return.
+        if (!ScanUtil.isServerSideMaskingEnabled(phoenixConnection) || SchemaUtil.isSystemTable(
+                SchemaUtil.getTableNameAsBytes(table.getSchemaName().getString(),
+                        table.getTableName().getString()))) {
+            return;
+        }
+
+        PTable dataTable = table;
+        String tableName = table.getTableName().getString();
+        if ((table.getType() == PTableType.INDEX) && (table.getParentName() != null)) {
+            String parentSchemaName = table.getParentSchemaName().getString();
+            String parentTableName = table.getParentTableName().getString();
+            // Look up the parent view as we could have inherited this index from an ancestor
+            // view(V) with Index (VIndex) -> child view (V1) -> grand child view (V2)
+            // the view index name will be V2#V1#VIndex
+            // Since we store PHOENIX_TTL at every level, all children have the same value.
+            // So looking at the child view is sufficient.
+            if (tableName.contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+                String parentViewName =
+                        SchemaUtil.getSchemaNameFromFullName(tableName,
+                                QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+                parentSchemaName = SchemaUtil.getSchemaNameFromFullName(parentViewName);
+                parentTableName = SchemaUtil.getTableNameFromFullName(parentViewName);
+            }
+            try {
+                dataTable = PhoenixRuntime.getTable(phoenixConnection,
+                        SchemaUtil.getTableName(parentSchemaName, parentTableName));
+            } catch (TableNotFoundException e) {
+                // This data table does not exists anymore. No need to set the scan attributes
+                return;
+            }
+        }
+
+        if (dataTable.getPhoenixTTL() != 0) {
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME,
+                    Bytes.toBytes(tableName));
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL,
+                    Bytes.toBytes(Long.valueOf(dataTable.getPhoenixTTL())));
+            if (!ScanUtil.isDeleteTTLExpiredRows(scan)) {
+                scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            }
+            addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
+        }
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
index ccc378f..ded8f7b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
@@ -20,7 +20,11 @@ package org.apache.phoenix.query;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -31,9 +35,12 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Arrays.asList;
@@ -186,7 +193,44 @@ public class PhoenixTestBuilder {
     // Test Data supplier interface for test writers to provide custom data.
     public interface DataSupplier {
         // return the values to be used for upserting data into the underlying entity.
-        List<Object> getValues(int rowIndex);
+        List<Object> getValues(int rowIndex) throws Exception;
+    }
+
+    // A Data Reader to be used in tests to read test data from test db.
+    public interface DataReader {
+        // returns the columns that need to be projected during DML queries,
+        List<String> getValidationColumns();
+
+        void setValidationColumns(List<String> validationColumns);
+
+        // returns the columns that represent the pk/unique key for this data set,
+        List<String> getRowKeyColumns();
+
+        void setRowKeyColumns(List<String> rowKeyColumns);
+
+        // returns the connection to be used for DML queries.
+        Connection getConnection();
+
+        // The connection to be used for the Reader.
+        // Please make sure the connection is closed after use or called with try-with resources.
+        void setConnection(Connection connection);
+
+        // returns the target entity - whether to use the table, global-view, the tenant-view or
+        // an index table.
+        String getTargetEntity();
+
+        void setTargetEntity(String targetEntity);
+
+        // Build the DML statement and return the SQL string.
+        String getDML();
+
+        String setDML(String dmlStatement);
+
+        // template method to read a batch of rows using the above sql.
+        void readRows() throws SQLException;
+
+        // Get the data that was read as a Table.
+        Table<String, String, Object> getDataTable();
     }
 
     // A Data Writer to be used in tests to upsert sample data (@see TestDataSupplier) into the sample schema.
@@ -205,6 +249,8 @@ public class PhoenixTestBuilder {
         // returns the connection to be used for upserting rows.
         Connection getConnection();
 
+        // The connection to be used for the Writer.
+        // Please make sure the connection is closed after use or called with try-with resources.
         void setConnection(Connection connection);
 
         // returns the target entity - whether to use the table, global-view or the tenant-view.
@@ -212,78 +258,156 @@ public class PhoenixTestBuilder {
 
         void setTargetEntity(String targetEntity);
 
+        // returns the columns that is set asthe pk/unique key for this data set,
+        List<String> getRowKeyColumns();
+
+        void setRowKeyColumns(List<String> rowKeyColumns);
+
         // return the data provider for this writer
         DataSupplier getTestDataSupplier();
 
-        // template method to upsert rows using the above info.
-        void upsertRow(int rowIndex) throws SQLException;
-
         void setDataSupplier(DataSupplier dataSupplier);
+
+        // template method to upsert a single row using the above info.
+        List<Object> upsertRow(int rowIndex) throws Exception;
+
+        // template method to upsert a batch of rows using the above info.
+        void upsertRows(int startRowIndex, int numRows) throws Exception;
+
+        // Get the data that was written as a Table
+        Table<String, String, Object> getDataTable();
     }
 
-    /**
-     * Test SchemaBuilder defaults.
-     */
-    public static class DDLDefaults {
-        public static final int MAX_ROWS = 10000;
-        public static List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)");
-        public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
-        public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+    // Provides template method for returning result set
+    public static abstract class AbstractDataReader implements DataReader {
+        Table<String, String, Object> dataTable = TreeBasedTable.create();
 
-        public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR");
-        public static List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3");
-        public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6");
-        public static List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9");
+        public Table<String, String, Object> getDataTable() {
+            return dataTable;
+        }
 
-        public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null);
-        public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null);
-        public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+        // Read batch of rows
+        public void readRows() throws SQLException {
+            dataTable.clear();
+            dataTable = TreeBasedTable.create();
+            String sql = getDML();
+            Connection connection = getConnection();
+            try (Statement stmt = connection.createStatement()) {
+
+                final PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class);
+                ResultSet rs = pstmt.executeQuery(sql);
+                List<String> cols = getValidationColumns();
+                List<Object> values = Lists.newArrayList();
+                Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ?
+                        Sets.<String>newHashSet() :
+                        Sets.newHashSet(getRowKeyColumns());
+                List<String> rowKeyParts = Lists.newArrayList();
+                while (rs.next()) {
+                    for (String col : cols) {
+                        Object val = rs.getObject(col);
+                        values.add(val);
+                        if (rowKeys.isEmpty()) {
+                            rowKeyParts.add(val.toString());
+                        }
+                        else if (rowKeys.contains(col)) {
+                            rowKeyParts.add(val.toString());
+                        }
+                    }
 
-        public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
-        public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
-        public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+                    String rowKey = Joiner.on("-").join(rowKeyParts);
+                    for (int v = 0; v < values.size(); v++) {
+                        dataTable.put(rowKey,cols.get(v), values.get(v));
+                    }
+                    values.clear();
+                    rowKeyParts.clear();
+                }
+                LOGGER.info(String.format("########## rows: %d", dataTable.rowKeySet().size()));
 
-        public static List<String> TABLE_INDEX_COLUMNS = asList("COL1");
-        public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+            } catch (SQLException e) {
+                LOGGER.error(String.format(" Error [%s] initializing Reader. ",
+                        e.getMessage()));
+                throw e;
+            }
+        }
+    }
+
+    // An implementation of the DataReader.
+    public static class BasicDataReader extends AbstractDataReader {
+
+        Connection connection;
+        String targetEntity;
+        String dmlStatement;
+        List<String> validationColumns;
+        List<String> rowKeyColumns;
 
-        public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
-        public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6");
 
-        public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
-        public static List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7");
+        @Override public String getDML() {
+            return this.dmlStatement;
+        }
 
-        public static String
-                DEFAULT_TABLE_PROPS =
-                "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
-        public static String DEFAULT_TABLE_INDEX_PROPS = "";
-        public static String DEFAULT_GLOBAL_VIEW_PROPS = "";
-        public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
-        public static String DEFAULT_TENANT_VIEW_PROPS = "";
-        public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
-        public static String DEFAULT_KP = "0EC";
-        public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
-        public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s";
+        @Override public String setDML(String dmlStatement) {
+            return this.dmlStatement = dmlStatement;
+        }
+
+        // returns the columns that need to be projected during DML queries,
+        @Override public List<String> getValidationColumns() {
+            return this.validationColumns;
+        }
+
+        @Override public void setValidationColumns(List<String> validationColumns) {
+            this.validationColumns = validationColumns;
+        }
+
+        // returns the columns that is set as the pk/unique key for this data set,
+        @Override public  List<String> getRowKeyColumns() {
+            return this.rowKeyColumns;
+        }
+
+        @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+            this.rowKeyColumns = rowKeyColumns;
+        }
+
+        @Override public Connection getConnection() {
+            return connection;
+        }
 
-        public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+        @Override public void setConnection(Connection connection) {
+            this.connection = connection;
+        }
 
+        @Override public String getTargetEntity() {
+            return targetEntity;
+        }
+
+        @Override public void setTargetEntity(String targetEntity) {
+            this.targetEntity = targetEntity;
+        }
     }
 
+
     // Provides template method for upserting rows
     public static abstract class AbstractDataWriter implements DataWriter {
+        Table<String, String, Object> dataTable = TreeBasedTable.create();
+
+        public Table<String, String, Object> getDataTable() {
+            return dataTable;
+        }
 
-        public void upsertRow(int rowIndex) throws SQLException {
+        // Upsert one row.
+        public List<Object> upsertRow(int rowIndex) throws Exception {
             List<String> upsertColumns = Lists.newArrayList();
             List<Object> upsertValues = Lists.newArrayList();
 
+            List<Object> rowValues = null;
+            rowValues = getTestDataSupplier().getValues(rowIndex);
             if (getColumnPositionsToUpdate().isEmpty()) {
                 upsertColumns.addAll(getUpsertColumns());
-                upsertValues.addAll(getTestDataSupplier().getValues(rowIndex));
+                upsertValues.addAll(rowValues);
             } else {
-                List<String> tmpColumns = getUpsertColumns();
-                List<Object> tmpValues = getTestDataSupplier().getValues(rowIndex);
+                List<String> columnsToUpdate = getUpsertColumns();
                 for (int i : getColumnPositionsToUpdate()) {
-                    upsertColumns.add(tmpColumns.get(i));
-                    upsertValues.add(tmpValues.get(i));
+                    upsertColumns.add(columnsToUpdate.get(i));
+                    upsertValues.add(rowValues.get(i));
                 }
             }
             StringBuilder buf = new StringBuilder("UPSERT INTO ");
@@ -294,27 +418,101 @@ public class PhoenixTestBuilder {
                 buf.append("?,");
             }
             buf.setCharAt(buf.length() - 1, ')');
-
-            LOGGER.info(buf.toString());
+            LOGGER.debug(buf.toString());
 
             Connection connection = getConnection();
             try (PreparedStatement stmt = connection.prepareStatement(buf.toString())) {
                 for (int i = 0; i < upsertValues.size(); i++) {
+                    //TODO : handle null values
                     stmt.setObject(i + 1, upsertValues.get(i));
                 }
                 stmt.execute();
                 connection.commit();
             }
+            return upsertValues;
+        }
+
+        // Upsert batch of rows.
+        public void upsertRows(int startRowIndex, int numRows) throws Exception {
+            dataTable.clear();
+            dataTable = TreeBasedTable.create();
+            List<String> upsertColumns = Lists.newArrayList();
+            List<Integer> rowKeyPositions = Lists.newArrayList();
+
+            // Figure out the upsert columns based on whether this is a full or partial row update.
+            boolean isFullRowUpdate = getColumnPositionsToUpdate().isEmpty();
+            if (isFullRowUpdate) {
+                upsertColumns.addAll(getUpsertColumns());
+            } else {
+                List<String> tmpColumns = getUpsertColumns();
+                for (int i : getColumnPositionsToUpdate()) {
+                    upsertColumns.add(tmpColumns.get(i));
+                }
+            }
+
+            Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ?
+                    Sets.<String>newHashSet(getUpsertColumns()) :
+                    Sets.newHashSet(getRowKeyColumns());
+
+            StringBuilder buf = new StringBuilder("UPSERT INTO ");
+            buf.append(getTargetEntity());
+            buf.append(" (").append(Joiner.on(",").join(upsertColumns)).append(") VALUES(");
+            for (int i = 0; i < upsertColumns.size(); i++) {
+                buf.append("?,");
+                if (rowKeys.contains(upsertColumns.get(i))) {
+                    rowKeyPositions.add(i);
+                }
+            }
+            buf.setCharAt(buf.length() - 1, ')');
+            LOGGER.debug (buf.toString());
+
+            Connection connection = getConnection();
+            try (PreparedStatement stmt = connection.prepareStatement(buf.toString())) {
+
+                for (int r = startRowIndex; r < startRowIndex + numRows; r++) {
+                    List<Object> upsertValues = Lists.newArrayList();
+                    List<Object> rowValues = null;
+                    rowValues = getTestDataSupplier().getValues(r);
+                    if (isFullRowUpdate) {
+                        upsertValues.addAll(rowValues);
+                    } else {
+                        for (int c : getColumnPositionsToUpdate()) {
+                            upsertValues.add(rowValues.get(c));
+                        }
+                    }
+
+                    List<String> rowKeyParts = Lists.newArrayList();
+                    for (int position : rowKeyPositions) {
+                        if (upsertValues.get(position) != null) {
+                            rowKeyParts.add(upsertValues.get(position).toString());
+                        }
+                    }
+                    String rowKey = Joiner.on("-").join(rowKeyParts);
+
+                    for (int v = 0; v < upsertValues.size(); v++) {
+                        //TODO : handle null values
+                        stmt.setObject(v + 1, upsertValues.get(v));
+                        if (upsertValues.get(v) != null) {
+                            dataTable.put(rowKey,upsertColumns.get(v), upsertValues.get(v));
+                        }
+                    }
+                    stmt.addBatch();
+                }
+                stmt.executeBatch();
+                connection.commit();
+            }
         }
+
     }
 
-    // An implementation of the TestDataWriter.
+    // An implementation of the DataWriter.
     public static class BasicDataWriter extends AbstractDataWriter {
         List<String> upsertColumns = Lists.newArrayList();
         List<Integer> columnPositionsToUpdate = Lists.newArrayList();
         DataSupplier dataSupplier;
         Connection connection;
         String targetEntity;
+        List<String> rowKeyColumns;
 
         @Override public List<String> getUpsertColumns() {
             return upsertColumns;
@@ -348,6 +546,15 @@ public class PhoenixTestBuilder {
             this.targetEntity = targetEntity;
         }
 
+        // returns the columns that is set as the pk/unique key for this data set,
+        @Override public  List<String> getRowKeyColumns() {
+            return this.rowKeyColumns;
+        }
+
+        @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+            this.rowKeyColumns = rowKeyColumns;
+        }
+
         @Override public DataSupplier getTestDataSupplier() {
             return dataSupplier;
         }
@@ -355,6 +562,56 @@ public class PhoenixTestBuilder {
         @Override public void setDataSupplier(DataSupplier dataSupplier) {
             this.dataSupplier = dataSupplier;
         }
+
+    }
+
+    /**
+     * Test SchemaBuilder defaults.
+     */
+    public static class DDLDefaults {
+        public static final int MAX_ROWS = 10000;
+        public static final List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)");
+        public static final List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
+        public static final List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+
+        public static final List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR");
+        public static final List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3");
+        public static final List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6");
+        public static final List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9");
+
+        public static final List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null);
+        public static final List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+        public static final List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+
+        public static final List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
+        public static final List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
+        public static final List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+
+        public static final List<String> TABLE_INDEX_COLUMNS = asList("COL1");
+        public static final List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+
+        public static final List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
+        public static final List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6");
+
+        public static final List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
+        public static final List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7");
+
+        public static final String
+                DEFAULT_TABLE_PROPS =
+                "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
+        public static final String DEFAULT_TABLE_INDEX_PROPS = "";
+        public static final String DEFAULT_GLOBAL_VIEW_PROPS = "";
+        public static final String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
+        public static final String DEFAULT_TENANT_VIEW_PROPS = "";
+        public static final String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
+        public static final String DEFAULT_KP = "ECZ";
+        public static final String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
+        public static final String DEFAULT_TENANT_ID_FMT = "00D0t%04d%s";
+        public static final String DEFAULT_UNIQUE_TABLE_NAME_FMT = "T_%s_%s";
+        public static final String DEFAULT_UNIQUE_GLOBAL_VIEW_NAME_FMT = "GV_%s_%s";
+
+        public static final String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+
     }
 
     /**
@@ -521,12 +778,12 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE TABLE IF NOT EXISTS " +
-		// tableName +
-		// "(" +
-		// 		dataColumns +
-		// 		" CONSTRAINT pk PRIMARY KEY (" + pk + ")
-		// 	)  " +
-		// 	(dataProps.isEmpty() ? "" : dataProps;
+        // tableName +
+        // "(" +
+        // 		dataColumns +
+        // 		" CONSTRAINT pk PRIMARY KEY (" + pk + ")
+        // 	)  " +
+        // 	(dataProps.isEmpty() ? "" : dataProps;
         public SchemaBuilder withTableDefaults() {
             tableEnabled = true;
             tableCreated = false;
@@ -535,11 +792,11 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE TABLE IF NOT EXISTS " +
-		// tableName +
-		// "(" +
-		// 		dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")
-		// 	)  " +
-		// 	(dataProps.isEmpty() ? "" : dataProps;
+        // tableName +
+        // "(" +
+        // 		dataColumns + " CONSTRAINT pk PRIMARY KEY (" + pk + ")
+        // 	)  " +
+        // 	(dataProps.isEmpty() ? "" : dataProps;
         public SchemaBuilder withTableOptions(TableOptions options) {
             tableEnabled = true;
             tableCreated = false;
@@ -548,8 +805,8 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE VIEW IF NOT EXISTS " +
-		// globalViewName +
-		// AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
+        // globalViewName +
+        // AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
         public SchemaBuilder withSimpleGlobalView() {
             globalViewEnabled = true;
             globalViewCreated = false;
@@ -558,10 +815,10 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE VIEW IF NOT EXISTS " +
-		// globalViewName +
-		// "(" +
-		// 		globalViewColumns + " CONSTRAINT pk PRIMARY KEY (" + globalViewPK + ")
-		// 	) AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
+        // globalViewName +
+        // "(" +
+        // 		globalViewColumns + " CONSTRAINT pk PRIMARY KEY (" + globalViewPK + ")
+        // 	) AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
         public SchemaBuilder withGlobalViewDefaults() {
             globalViewEnabled = true;
             globalViewCreated = false;
@@ -570,10 +827,10 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE VIEW IF NOT EXISTS " +
-		// globalViewName +
-		// "(" +
-		// 		globalViewColumns + " CONSTRAINT pk PRIMARY KEY (" + globalViewPK + ")
-		// 	) AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
+        // globalViewName +
+        // "(" +
+        // 		globalViewColumns + " CONSTRAINT pk PRIMARY KEY (" + globalViewPK + ")
+        // 	) AS SELECT * FROM " + tableName + " WHERE " + globalViewCondition;
         public SchemaBuilder withGlobalViewOptions(GlobalViewOptions options) {
             globalViewEnabled = true;
             globalViewCreated = false;
@@ -590,10 +847,10 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE VIEW  IF NOT EXISTS " +
-		// tenantViewName +
-		// "(" +
-		// 		tenantViewColumns + " CONSTRAINT pk PRIMARY KEY (" + tenantViewPK + ")
-		// 	) AS SELECT * FROM " + globalViewName;
+        // tenantViewName +
+        // "(" +
+        // 		tenantViewColumns + " CONSTRAINT pk PRIMARY KEY (" + tenantViewPK + ")
+        // 	) AS SELECT * FROM " + globalViewName;
         public SchemaBuilder withTenantViewDefaults() {
             tenantViewEnabled = true;
             tenantViewCreated = false;
@@ -602,10 +859,10 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE VIEW  IF NOT EXISTS " +
-		// tenantViewName +
-		// "(" +
-		// 		tenantViewColumns + " CONSTRAINT pk PRIMARY KEY (" + tenantViewPK + ")
-		// 	) AS SELECT * FROM " + globalViewName;
+        // tenantViewName +
+        // "(" +
+        // 		tenantViewColumns + " CONSTRAINT pk PRIMARY KEY (" + tenantViewPK + ")
+        // 	) AS SELECT * FROM " + globalViewName;
         public SchemaBuilder withTenantViewOptions(TenantViewOptions options) {
             tenantViewEnabled = true;
             tenantViewCreated = false;
@@ -614,8 +871,8 @@ public class PhoenixTestBuilder {
         }
 
         // "CREATE INDEX IF NOT EXISTS
-		// "IDX_T_T000001"
-		// ON "TEST_ENTITY"."T_T000001"(COL1) INCLUDE (COL3)"
+        // "IDX_T_T000001"
+        // ON "TEST_ENTITY"."T_T000001"(COL1) INCLUDE (COL3)"
         public SchemaBuilder withTableIndexDefaults() {
             tableIndexEnabled = true;
             tableIndexCreated = false;
@@ -689,9 +946,9 @@ public class PhoenixTestBuilder {
         }
 
         // Build method for creating new tenants with existing table,
-		// global and tenant view definitions.
+        // global and tenant view definitions.
         // If the tenant view definition is not changed then
-		// the same view is created with different names for different tenants.
+        // the same view is created with different names for different tenants.
         public void buildWithNewTenant() throws Exception {
             tenantViewCreated = false;
             tenantViewIndexCreated = false;
@@ -706,9 +963,9 @@ public class PhoenixTestBuilder {
         }
 
         // Build method for creating new tenant views with existing table,
-		// global and tenant view definitions.
+        // global and tenant view definitions.
         // If the tenant view definition is not changed then
-		// the same view is created with different names.
+        // the same view is created with different names.
         public void buildNewView() throws Exception {
             tenantViewCreated = false;
             tenantViewIndexCreated = false;
@@ -735,39 +992,67 @@ public class PhoenixTestBuilder {
                 this.connectOptions = new ConnectOptions();
             }
 
+            if (this.globalViewOptions == null) {
+                this.globalViewOptions = new GlobalViewOptions();
+            }
+
+            if (this.globalViewIndexOptions == null) {
+                this.globalViewIndexOptions = new GlobalViewIndexOptions();
+            }
+
+            if (this.tenantViewOptions == null) {
+                this.tenantViewOptions = new TenantViewOptions();
+            }
+
+            if (this.tenantViewIndexOptions == null) {
+                this.tenantViewIndexOptions = new TenantViewIndexOptions();
+            }
+
             if (connectOptions.useGlobalConnectionOnly
                     && connectOptions.useTenantConnectionForGlobalView) {
                 throw new IllegalArgumentException(
-                		"useTenantConnectionForGlobalView and useGlobalConnectionOnly both cannot be true");
+                        "useTenantConnectionForGlobalView and useGlobalConnectionOnly both cannot be true");
+            }
+
+            String tableName = SchemaUtil.normalizeIdentifier(dataOptions.getTableName());
+            String globalViewName = SchemaUtil.normalizeIdentifier(dataOptions.getGlobalViewName());
+            String tableSchemaNameToUse = tableOptions.getSchemaName();
+            String globalViewSchemaNameToUse = globalViewOptions.getSchemaName();
+            String tenantViewSchemaNameToUse = tenantViewOptions.getSchemaName();
+
+            // If schema name is overridden by specifying it in data options then use it.
+            if ((dataOptions.getSchemaName() != null) && (!dataOptions.getSchemaName().isEmpty())) {
+                tableSchemaNameToUse = dataOptions.getSchemaName();
+                globalViewSchemaNameToUse = dataOptions.getSchemaName();
+                tenantViewSchemaNameToUse = dataOptions.getSchemaName();
             }
 
-            String tableName = SchemaUtil.getEscapedArgument("T_" + dataOptions.uniqueName);
-            String globalViewName = SchemaUtil.getEscapedArgument("V_" + dataOptions.uniqueName);
             String
                     tableSchemaName =
-                    tableEnabled ? SchemaUtil.getEscapedArgument(tableOptions.schemaName) : "";
+                    tableEnabled ? SchemaUtil.normalizeIdentifier(tableSchemaNameToUse) : "";
             String
                     globalViewSchemaName =
                     globalViewEnabled ?
-                            SchemaUtil.getEscapedArgument(globalViewOptions.schemaName) :
+                            SchemaUtil.normalizeIdentifier(globalViewSchemaNameToUse) :
                             "";
             String
                     tenantViewSchemaName =
                     tenantViewEnabled ?
-                            SchemaUtil.getEscapedArgument(tenantViewOptions.schemaName) :
+                            SchemaUtil.normalizeIdentifier(tenantViewSchemaNameToUse) :
                             "";
             entityTableName = SchemaUtil.getTableName(tableSchemaName, tableName);
             entityGlobalViewName = SchemaUtil.getTableName(globalViewSchemaName, globalViewName);
 
             // Derive the keyPrefix to use.
-            entityKeyPrefix =
+            entityKeyPrefix = dataOptions.getKeyPrefix() != null && !dataOptions.getKeyPrefix().isEmpty()?
+                    dataOptions.getKeyPrefix() :
                     connectOptions.useGlobalConnectionOnly ?
                             (String.format("Z%02d", dataOptions.getViewNumber())) :
                             (tenantViewEnabled && !globalViewEnabled ?
                                     (String.format("Z%02d", dataOptions.getViewNumber())) :
                                     DDLDefaults.DEFAULT_KP);
 
-            String tenantViewName = SchemaUtil.getEscapedArgument(entityKeyPrefix);
+            String tenantViewName = SchemaUtil.normalizeIdentifier(entityKeyPrefix);
             entityTenantViewName = SchemaUtil.getTableName(tenantViewSchemaName, tenantViewName);
             String globalViewCondition = String.format("KP = '%s'", entityKeyPrefix);
 
@@ -787,7 +1072,7 @@ public class PhoenixTestBuilder {
                 if (tableIndexEnabled && !tableIndexCreated) {
                     String
                             indexOnTableName =
-                            SchemaUtil.getEscapedArgument(String.format("IDX_%s",
+                            SchemaUtil.normalizeIdentifier(String.format("IDX_%s",
                                     SchemaUtil.normalizeIdentifier(tableName)));
                     globalConnection.createStatement().execute(
                             buildCreateIndexStmt(indexOnTableName, entityTableName,
@@ -862,15 +1147,15 @@ public class PhoenixTestBuilder {
             statement.append(isLocal ?
                     "CREATE LOCAL INDEX IF NOT EXISTS " :
                     "CREATE INDEX IF NOT EXISTS ")
-					.append(indexName)
-					.append(" ON ")
+                    .append(indexName)
+                    .append(" ON ")
                     .append(onEntityName)
-					.append("(")
-					.append(Joiner.on(",").join(indexColumns))
+                    .append("(")
+                    .append(Joiner.on(",").join(indexColumns))
                     .append(") ")
-					.append(includeColumns.isEmpty() ?
-						"" :
-						"INCLUDE (" + Joiner.on(",").join(includeColumns) + ") ")
+                    .append(includeColumns.isEmpty() ?
+                            "" :
+                            "INCLUDE (" + Joiner.on(",").join(includeColumns) + ") ")
                     .append((indexProps.isEmpty() ? "" : indexProps));
 
             LOGGER.info(statement.toString());
@@ -1549,10 +1834,15 @@ public class PhoenixTestBuilder {
 
         public static class DataOptions {
             String uniqueName = "";
+            String uniqueNamePrefix = "";
             String tenantIdFormat = DDLDefaults.DEFAULT_TENANT_ID_FMT;
+            String keyPrefix = "";
             int viewNumber = 0;
             AtomicInteger viewCounter = new AtomicInteger(0);
             String tenantId = "";
+            String schemaName = DDLDefaults.DEFAULT_SCHEMA_NAME;
+            String tableName = "";
+            String globalViewName = "";
 
             /*
              *****************************
@@ -1562,11 +1852,19 @@ public class PhoenixTestBuilder {
 
             public static DataOptions withDefaults() {
                 DataOptions options = new DataOptions();
-                options.uniqueName = generateUniqueName();
+                options.uniqueName = generateUniqueName().substring(1);
                 options.viewCounter = new AtomicInteger(0);
                 options.tenantId =
                         String.format(options.tenantIdFormat, TENANT_COUNTER.get(),
                                 options.uniqueName);
+                options.tableName =
+                        String.format("%s%s",
+                                options.uniqueNamePrefix.isEmpty() ? "T_" : options.uniqueNamePrefix,
+                                options.uniqueName);
+                options.globalViewName =
+                        String.format("%s%s",
+                                options.uniqueNamePrefix.isEmpty() ? "GV_" : options.uniqueNamePrefix,
+                                options.uniqueName);
                 return options;
             }
 
@@ -1601,6 +1899,58 @@ public class PhoenixTestBuilder {
             public void setTenantId(String tenantId) {
                 this.tenantId = tenantId;
             }
+
+            public String getUniqueNamePrefix() {
+                return uniqueNamePrefix;
+            }
+
+            public void setUniqueNamePrefix(String uniqueNamePrefix) {
+                this.uniqueNamePrefix = uniqueNamePrefix;
+            }
+
+            public String getKeyPrefix() {
+                return keyPrefix;
+            }
+
+            public void setKeyPrefix(String keyPrefix) {
+                this.keyPrefix = keyPrefix;
+            }
+
+            public void setViewNumber(int viewNumber) {
+                this.viewNumber = viewNumber;
+            }
+
+            public AtomicInteger getViewCounter() {
+                return viewCounter;
+            }
+
+            public void setViewCounter(AtomicInteger viewCounter) {
+                this.viewCounter = viewCounter;
+            }
+
+            public String getTableName() {
+                return tableName;
+            }
+
+            public void setTableName(String tableName) {
+                this.tableName = tableName;
+            }
+
+            public String getGlobalViewName() {
+                return globalViewName;
+            }
+
+            public void setGlobalViewName(String globalViewName) {
+                this.globalViewName = globalViewName;
+            }
+
+            public String getSchemaName() {
+                return schemaName;
+            }
+
+            public void setSchemaName(String schemaName) {
+                this.schemaName = schemaName;
+            }
         }
     }
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
index a8e7354..14b2d5e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
@@ -17,17 +17,24 @@
  */
 package org.apache.phoenix.util;
 
-import static org.junit.Assert.assertArrayEquals;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
 import org.apache.phoenix.schema.SortOrder;
@@ -36,14 +43,23 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
 
 
 /**
@@ -466,4 +482,78 @@ public class ScanUtilTest {
             assertArrayEquals(expectedEndKey, endKey);
         }
     }
+
+    public static class PhoenixTTLScanUtilTest extends BaseConnectionlessQueryTest {
+
+        @Test
+        public void testPhoenixTTLUtilMethods() throws SQLException {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = driver.connect(getUrl(), props)) {
+                PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+                PTable table = phxConn.getTable(new PTableKey(null, ATABLE_NAME));
+
+                byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+                byte[] emptyColumnName = table.getEncodingScheme()
+                        == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                        QueryConstants.EMPTY_COLUMN_BYTES :
+                        table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+                String row = "test.row";
+                long timestamp42 = 42L;
+                KeyValue.Type type42 = KeyValue.Type.Put;
+                String value42 = "test.value.42";
+                long seqId42 = 1042L;
+
+                List<Cell> cellList = Lists.newArrayList();
+                Cell cell42 = CellUtil.createCell(Bytes.toBytes(row),
+                        emptyColumnFamilyName, emptyColumnName,
+                        timestamp42, type42.getCode(), Bytes.toBytes(value42), seqId42);
+                // Add cell to the cell list
+                cellList.add(cell42);
+
+                long timestamp43 = 43L;
+                String columnName = "test_column";
+                KeyValue.Type type43 = KeyValue.Type.Put;
+                String value43 = "test.value.43";
+                long seqId43 = 1043L;
+                Cell cell43 = CellUtil.createCell(Bytes.toBytes(row),
+                        emptyColumnFamilyName, Bytes.toBytes(columnName),
+                        timestamp43, type43.getCode(), Bytes.toBytes(value43), seqId43);
+                // Add cell to the cell list
+                cellList.add(cell43);
+
+                long timestamp44 = 44L;
+                Scan testScan = new Scan();
+                testScan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(1L));
+                // Test isTTLExpired
+                Assert.assertTrue(ScanUtil.isTTLExpired(cell42, testScan, timestamp44));
+                Assert.assertFalse(ScanUtil.isTTLExpired(cell43, testScan, timestamp44));
+                // Test isEmptyColumn
+                Assert.assertTrue(ScanUtil.isEmptyColumn(cell42, emptyColumnFamilyName, emptyColumnName));
+                Assert.assertFalse(ScanUtil.isEmptyColumn(cell43, emptyColumnFamilyName, emptyColumnName));
+                // Test getMaxTimestamp
+                Assert.assertEquals(timestamp43, ScanUtil.getMaxTimestamp(cellList));
+            }
+        }
+
+        @Test
+        public void testIsServerSideMaskingPropertySet() throws Exception {
+            // Test property is not set
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            props.setProperty(QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, "false");
+            PhoenixTestDriver driver1 = new PhoenixTestDriver(ReadOnlyProps.EMPTY_PROPS.addAll(props));
+            try (Connection conn = driver1.connect(getUrl(), props)) {
+                PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+                Assert.assertFalse(ScanUtil.isServerSideMaskingEnabled(phxConn));
+            }
+
+            // Test property is set
+            props.setProperty(QueryServices.PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED, "true");
+            PhoenixTestDriver driver2 = new PhoenixTestDriver(ReadOnlyProps.EMPTY_PROPS.addAll(props));
+            try (Connection conn = driver2.connect(getUrl(), props)) {
+                PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+                Assert.assertTrue(ScanUtil.isServerSideMaskingEnabled(phxConn));
+            }
+        }
+    }
 }