You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/12/20 22:58:51 UTC

[phoenix] branch view-ttl updated: PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch view-ttl
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/view-ttl by this push:
     new 2237beb  PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor
2237beb is described below

commit 2237beba0069119a800fb7ee7d88282cc4b10fbb
Author: Jacob Isaac <ji...@salesforce.com>
AuthorDate: Fri Nov 15 08:58:15 2019 -0800

    PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor
    
    Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 1054 +++++++++++++++++++-
 .../coprocessor/BaseScannerRegionObserver.java     |   38 +-
 .../coprocessor/TTLAwareRegionObserver.java        |  336 +++++++
 .../phoenix/iterate/TableResultIterator.java       |    3 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   10 +
 .../java/org/apache/phoenix/util/IndexUtil.java    |   93 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  147 +++
 .../apache/phoenix/query/PhoenixTestBuilder.java   |  338 ++++++-
 8 files changed, 1837 insertions(+), 182 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index ee77b33..18a2fa8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -18,6 +18,8 @@
 
 package org.apache.phoenix.end2end;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
@@ -31,19 +33,32 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SubstringComparator;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataWriter;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataWriter;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataSupplier;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -51,20 +66,26 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ViewTTLIT extends ParallelStatsDisabledIT {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class);
-    private static final String ORG_ID_FMT = "00D0x000%s";
-    private static final String ID_FMT = "00A0y000%07d";
     private static final String VIEW_TTL_HEADER_SQL =  "SELECT VIEW_TTL FROM SYSTEM.CATALOG "
             + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = '%s'";
 
     private static final String ALTER_VIEW_TTL_SQL = "ALTER VIEW %s.%s set VIEW_TTL=%d";
+    private static final int DEFAULT_NUM_ROWS = 5;
 
     // Scans the HBase rows directly for the view ttl related header rows column and asserts
     private void assertViewHeaderRowsHaveViewTTLRelatedCells(String schemaName, long minTimestamp,
@@ -96,7 +117,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             assertEquals(String.format("Expected rows do not match for table = %s at timestamp %d",
                     Bytes.toString(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES), minTimestamp), expectedRows, numMatchingRows);
         }
-
     }
 
     private void assertSyscatHaveViewTTLRelatedColumns(String tenantId, String schemaName, String tableName, String tableType, long ttlValueExpected)
@@ -119,7 +139,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         return name.replace("\"", "");
     }
 
-
     /**
      * -----------------
      * Test methods
@@ -134,7 +153,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
         schemaBuilder
                 .withTableDefaults()
                 .withGlobalViewDefaults()
@@ -145,15 +164,13 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 2);
     }
 
-
-
     @Test
     public void testViewTTLWithTableLevelTTLFails() throws Exception {
 
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true,TTL=100");
@@ -177,7 +194,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -203,7 +220,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -238,18 +255,18 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
         // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
 
-        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
-                globalViewOptions = PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+        SchemaBuilder.GlobalViewOptions
+                globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
         globalViewOptions.setTableProps("VIEW_TTL=300000");
 
-        PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions
+        SchemaBuilder.GlobalViewIndexOptions
                 globalViewIndexOptions =
-                PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
         globalViewIndexOptions.setLocal(false);
 
         TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults();
@@ -274,7 +291,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 4);
         assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000);
         assertSyscatHaveViewTTLRelatedColumns("", schemaName, indexOnGlobalViewName, PTableType.INDEX.getSerializedValue(), 300000);
-        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        // Since the VIEW_TTL property values are being overriden, we expect the TTL value to be different from the global view.
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
 
@@ -315,7 +332,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -341,7 +358,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 0);
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 0);
-
     }
 
     @Test
@@ -351,7 +367,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Define the test schema.
         // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
         // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
-        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
 
         TableOptions tableOptions = TableOptions.withDefaults();
         tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -391,7 +407,997 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
         // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+    }
+
+    @Test
+    public void testViewTTLForLevelTwoViewWithNoIndexes() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions
+                globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps("VIEW_TTL=300000");
+
+        TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults();
+        tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=10000");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions)
+                .buildWithNewTenant();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 2 rows - one for GlobalView, one for TenantView  each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 2);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 10000);
+
+        // Without override
+        startTime = System.currentTimeMillis();
+
+        TenantViewOptions tenantViewWithoutOverrideOptions = TenantViewOptions.withDefaults();
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+                .buildWithNewTenant();
+
+        tenantId = schemaBuilder.getDataOptions().getTenantId();
+        schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 1 rows - one for TenantView each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 1);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be same as the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 300000);
+    }
+
+    @Test
+    public void testWithTenantViewAndNoGlobalView() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00A0y000%07d", rowIndex);
+                String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s",
+                    Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+                    dataReader, schemaBuilder);
+        }
+    }
+
+    @Test public void testWithSQLUsingIndex() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
 
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+            String col4ForWhereClause;
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format("00A0y000%07d", rowIndex);
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1)));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder);
+        }
     }
 
+    @Test public void testWithVariousSQLs() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final String groupById = String.format("00A0y000%07d", 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { groupById, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder);
+
+            // Case : group by sql
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName(),
+                    groupById));
+
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder);
+        }
+    }
+
+    @Test
+    public void testWithTenantViewAndGlobalViewAndVariousOptions() throws Exception {
+        long viewTTL = 10000;
+
+        // Define the test schema
+        TableOptions tableOptions = TableOptions.withDefaults();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                GlobalViewIndexOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        TenantViewIndexOptions
+                tenantViewIndexOptions =
+                TenantViewIndexOptions.withDefaults();
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null));
+
+        for (String additionalProps : Lists
+                .newArrayList("COLUMN_ENCODED_BYTES=0", "DEFAULT_COLUMN_FAMILY='0'")) {
+
+            StringBuilder withTableProps = new StringBuilder();
+            withTableProps.append("MULTI_TENANT=true,").append(additionalProps);
+
+            for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+                for (boolean isTenantViewLocal : Lists.newArrayList(true, false)) {
+
+                    tableOptions.setTableProps(withTableProps.toString());
+                    globalViewIndexOptions.setLocal(isGlobalViewLocal);
+                    tenantViewIndexOptions.setLocal(isTenantViewLocal);
+                    OtherOptions otherOptions = testCaseWhenAllCFMatchAndAllDefault;
+
+                    final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+                    schemaBuilder
+                            .withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withGlobalViewIndexOptions(globalViewIndexOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withTenantViewIndexOptions(tenantViewIndexOptions)
+                            .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                            .buildWithNewTenant();
+
+                    // Define the test data.
+                    DataSupplier dataSupplier = new DataSupplier() {
+
+                        @Override public List<Object> getValues(int rowIndex) {
+                            Random rnd = new Random();
+                            String id = String.format("00A0y000%07d", rowIndex);
+                            String zid = String.format("00B0y000%07d", rowIndex);
+                            String
+                                    col1 =
+                                    String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col2 =
+                                    String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col3 =
+                                    String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col4 =
+                                    String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col5 =
+                                    String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col6 =
+                                    String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col7 =
+                                    String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col8 =
+                                    String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                            String
+                                    col9 =
+                                    String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+
+                            return Lists.newArrayList(
+                                    new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7,
+                                            col8, col9 });
+                        }
+                    };
+
+                    // Create a test data reader/writer for the above schema.
+                    DataWriter dataWriter = new BasicDataWriter();
+                    DataReader dataReader = new BasicDataReader();
+
+                    List<String> columns = Lists.newArrayList("ID", "ZID", "COL1", "COL2",
+                            "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+                    List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+                    String
+                            tenantConnectUrl =
+                            getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder
+                                    .getDataOptions().getTenantId();
+                    try (Connection writeConnection = DriverManager
+                            .getConnection(tenantConnectUrl)) {
+                        writeConnection.setAutoCommit(true);
+                        dataWriter.setConnection(writeConnection);
+                        dataWriter.setDataSupplier(dataSupplier);
+                        dataWriter.setUpsertColumns(columns);
+                        dataWriter.setRowKeyColumns(rowKeyColumns);
+                        dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                        dataReader.setValidationColumns(columns);
+                        dataReader.setRowKeyColumns(rowKeyColumns);
+                        dataReader.setDML(String.format("SELECT %s from %s",
+                                Joiner.on(",").join(columns),
+                                schemaBuilder.getEntityTenantViewName()));
+                        dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                        // Validate data before and after ttl expiration.
+                        upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS,
+                                dataWriter, dataReader, schemaBuilder);
+                    }
+
+
+                    long scnTimestamp = System.currentTimeMillis()+viewTTL;
+                    // Delete data by simulating expiration.
+                    deleteData(schemaBuilder, scnTimestamp);
+
+                    // Verify after deleting TTL expired data.
+                    Properties props = new Properties();
+                    props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+                    try (Connection readConnection = DriverManager
+                            .getConnection(tenantConnectUrl, props)) {
+
+                        dataReader.setConnection(readConnection);
+                        com.google.common.collect.Table<String, String, Object> fetchedData =
+                                fetchData(dataReader);
+                        assertTrue("Expired rows should not be fetched",
+                                fetchedData.rowKeySet().size() == 0);
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testGlobalAndTenantViewTTLInheritance() throws Exception {
+        long globalViewTTL = 300000;
+        long tenantViewTTL = 30000;
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions
+                globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", globalViewTTL));
+
+        SchemaBuilder.GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions();
+        tenantViewWithOverrideOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewWithOverrideOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+        tenantViewWithOverrideOptions.setTableProps(String.format("VIEW_TTL=%d", tenantViewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null));
+
+        /**
+         * ************************************************************
+         * Case 1: Build schema with TTL overridden by the tenant view.
+         * TTL for GLOBAL_VIEW - 300000
+         * TTL for TENANT_VIEW - 30000
+         * ************************************************************
+         */
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions)
+                .withTenantViewIndexDefaults()
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .buildWithNewTenant();
+
+        // Define the test data.
+        final String id = String.format("00A0y000%07d", 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col1 =
+                        String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col2 =
+                        String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col3 =
+                        String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String
+                tenant1ConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenant1ConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            // Use the ttl overridden by the tenant.
+            validateExpiredRowsAreNotReturnedUsingCounts(tenantViewTTL, dataReader, schemaBuilder);
+        }
+
+        /**
+         * ************************************************************
+         * Case 2: Build schema with TTL NOT overridden by the tenant view.
+         * TTL for GLOBAL_VIEW - 300000
+         * TTL for TENANT_VIEW - 300000
+         * ************************************************************
+         */
+        TenantViewOptions tenantViewWithoutOverrideOptions = new TenantViewOptions();
+        tenantViewWithoutOverrideOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewWithoutOverrideOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR"));
+
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+                .withTenantViewIndexDefaults()
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .buildWithNewTenant();
+
+        String
+                tenant2ConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenant2ConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data exists before ttl expiration.
+            long probeTimestamp = System.currentTimeMillis() + globalViewTTL/2;
+            validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder);
+            // Validate data before and after ttl expiration.
+            // Use the global view ttl since that is what the view has inherited.
+            validateExpiredRowsAreNotReturnedUsingCounts(globalViewTTL, dataReader, schemaBuilder);
+        }
+    }
+
+    @Test public void testDeleteIfExpiredOnTenantView() throws Exception {
+
+        long viewTTL = 180000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00A0y000%07d", rowIndex);
+                String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col7, col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+                    dataReader, schemaBuilder);
+        }
+
+        long scnTimestamp = System.currentTimeMillis()+viewTTL;
+        // Delete data by simulating expiration.
+        deleteData(schemaBuilder, scnTimestamp);
+
+        // Verify after deleting TTL expired data.
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Expired rows should not be fetched",
+                    fetchedData.rowKeySet().size() == 0);
+        }
+    }
+
+    private void upsertDataAndRunValidations(long viewTTL, int numRowsToUpsert,
+            DataWriter dataWriter, DataReader dataReader,
+            SchemaBuilder schemaBuilder)
+            throws IOException, SQLException {
+
+        //Insert for the first time and validate them.
+        validateExpiredRowsAreNotReturnedUsingData(viewTTL, upsertData(dataWriter, numRowsToUpsert),
+                dataReader,
+                schemaBuilder);
+
+        // Update the above rows and validate the same.
+        validateExpiredRowsAreNotReturnedUsingData(viewTTL, upsertData(dataWriter, numRowsToUpsert),
+                dataReader,
+                schemaBuilder);
+
+    }
+
+    private void validateExpiredRowsAreNotReturnedUsingCounts(
+            long viewTTL,
+            DataReader dataReader,
+            SchemaBuilder schemaBuilder) throws SQLException {
+
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+
+        // Verify before TTL expiration
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+            assertTrue("Rows should exists before expiration",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+
+        // Verify after TTL expiration
+        long scnTimestamp = System.currentTimeMillis();
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp+(2*viewTTL)));
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+            assertTrue("Expired rows should not be fetched",
+                    fetchedData.rowKeySet().size() == 0);
+        }
+    }
+
+    private void validateExpiredRowsAreNotReturnedUsingData(
+            long viewTTL,
+            com.google.common.collect.Table<String, String, Object> upsertedData,
+            DataReader dataReader,
+            SchemaBuilder schemaBuilder) throws SQLException {
+
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+
+        // Verify before TTL expiration
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Upserted data should not be null",
+                    upsertedData != null);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+
+            verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+        }
+
+        // Verify after TTL expiration
+        long scnTimestamp = System.currentTimeMillis();
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp+(2*viewTTL)));
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+            assertTrue("Expired rows should not be fetched",
+                    fetchedData.rowKeySet().size() == 0);
+        }
+
+    }
+
+    private void validateRowsAreNotMaskedUsingCounts(
+            long probeTimestamp,
+            DataReader dataReader,
+            SchemaBuilder schemaBuilder) throws SQLException {
+
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+
+        // Verify rows exists (not masked) at current time
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+            assertTrue("Rows should exists before ttl expiration (now)",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+
+        // Verify rows exists (not masked) at probed timestamp
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(probeTimestamp));
+        try (Connection readConnection = DriverManager
+                .getConnection(tenantConnectUrl, props)) {
+
+            dataReader.setConnection(readConnection);
+            com.google.common.collect.Table<String, String, Object> fetchedData =
+                    fetchData(dataReader);
+            assertTrue("Fetched data should not be null",
+                    fetchedData != null);
+            assertTrue("Rows should exists before ttl expiration (probe-timestamp)",
+                    fetchedData.rowKeySet().size() > 0);
+        }
+    }
+
+    private void verifyRowsBeforeTTLExpiration(
+            com.google.common.collect.Table<String, String, Object> upsertedData,
+            com.google.common.collect.Table<String, String, Object> fetchedData) {
+
+        Set<String> upsertedRowKeys = upsertedData.rowKeySet();
+        Set<String> fetchedRowKeys = fetchedData.rowKeySet();
+        assertTrue("Upserted row keys should not be null",
+                upsertedRowKeys != null);
+        assertTrue("Fetched row keys should not be null",
+                fetchedRowKeys != null);
+        assertTrue("Rows upserted and fetched do not match",
+                upsertedRowKeys.equals(fetchedRowKeys));
+
+
+        Set<String> fetchedCols = fetchedData.columnKeySet();
+        for (String rowKey : fetchedRowKeys) {
+            for (String columnKey : fetchedCols) {
+                Object upsertedValue = upsertedData.get(rowKey, columnKey);
+                Object fetchedValue = fetchedData.get(rowKey, columnKey);
+                assertTrue("Upserted values should not be null",
+                        upsertedValue != null);
+                assertTrue("Fetched values should not be null",
+                        fetchedValue != null);
+                assertTrue("Values upserted and fetched do not match",
+                        upsertedValue.equals(fetchedValue));
+            }
+        }
+    }
+
+    private com.google.common.collect.Table<String, String, Object> upsertData(
+            DataWriter dataWriter, int numRowsToUpsert) throws SQLException {
+        // Upsert rows
+        dataWriter.upsertRows(numRowsToUpsert);
+        return dataWriter.getDataTable();
+    }
+
+    private com.google.common.collect.Table<String, String, Object> fetchData(
+            DataReader dataReader) throws SQLException {
+
+        dataReader.readRows();
+        return dataReader.getDataTable();
+    }
+
+    private void deleteData(SchemaBuilder schemaBuilder, long scnTimestamp) throws SQLException {
+
+        String viewName = schemaBuilder.getEntityTenantViewName();
+
+        Properties props = new Properties();
+        props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions()
+                        .getTenantId();
+
+        try (Connection deleteConnection = DriverManager.getConnection(tenantConnectUrl, props);
+                final Statement statement = deleteConnection.createStatement()) {
+            deleteConnection.setAutoCommit(true);
+
+            final String deleteIfExpiredStatement = String.format("select * from  %s", viewName);
+            Preconditions.checkNotNull(deleteIfExpiredStatement);
+
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+
+
+            PTable table = PhoenixRuntime.getTable(deleteConnection, schemaBuilder.getDataOptions().getTenantId(), viewName);
+
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[]
+                    emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED, PDataType.FALSE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL, Bytes.toBytes(Long.valueOf(table.getViewTTL())));
+            PhoenixResultSet rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext());
+        }
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 585c17f..bf58ea2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -54,7 +54,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 
-
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
 
     public static final String AGGREGATORS = "_Aggs";
@@ -98,6 +97,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync";
     public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
     public static final String TX_SCN = "_TxScn";
+    public static final String VIEW_TTL = "_ViewTTL";
+    public static final String MASK_VIEW_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
+    public static final String DELETE_VIEW_TTL_EXPIRED = "_DELETE_TTL_EXPIRED";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
     public final static String SCAN_OFFSET = "_RowOffset";
@@ -121,7 +123,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     // In case of Index Write failure, we need to determine that Index mutation
     // is part of normal client write or Index Rebuilder. # PHOENIX-5080
     public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3);
-    
+
     public enum ReplayWrite {
         TABLE_AND_INDEX,
         INDEX_ONLY,
@@ -219,6 +221,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             TimeRange timeRange = scan.getTimeRange();
             scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
         }
+
         if (isRegionObserverFor(scan)) {
             // For local indexes, we need to throw if out of region as we'll get inconsistent
             // results otherwise while in other cases, it may just mean out client-side data
@@ -374,20 +377,23 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
-    @Override
-    public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
-        final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
-        final KeyValueScanner s) throws IOException {
+    @Override public KeyValueScanner preStoreScannerOpen(
+            final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final Scan scan, final NavigableSet<byte[]> targetCols, final KeyValueScanner s)
+            throws IOException {
 
-      if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
-        return s;
-      }
-      
-      if (s!=null) {
-          s.close();
-      }
-      ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
-      return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
-          c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+        if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo())
+                || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil
+                .isTransactionalTimestamp(scan.getTimeRange().getMax())) {
+            return s;
+        }
+
+        if (s != null) {
+            s.close();
+        }
+
+        ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+        return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
+                c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java
new file mode 100644
index 0000000..e133b10
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
+import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+/**
+ * 
+ * Coprocessor that checks whether the row is expired based on the TTL spec.
+ *
+ */
+public class TTLAwareRegionObserver extends BaseRegionObserver {
+    private static final Log LOG = LogFactory.getLog(TTLAwareRegionObserver.class);
+
+    /**
+     * A region scanner that checks the TTL expiration of rows
+     */
+    private static class TTLAwareRegionScanner implements RegionScanner {
+        RegionScanner scanner;
+        private Scan scan;
+        private byte[] emptyCF;
+        private byte[] emptyCQ;
+        private Region region;
+        private long minTimestamp;
+        private long maxTimestamp;
+        private long now;
+        private boolean deleteIfExpired;
+        private boolean maskIfExpired;
+
+        public TTLAwareRegionScanner(RegionCoprocessorEnvironment env,
+                                  Scan scan,
+                                  RegionScanner scanner) throws IOException {
+            this.scan = scan;
+            this.scanner = scanner;
+
+            deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan) ? true : false;
+            maskIfExpired = !deleteIfExpired && ScanUtil.isMaskTTLExpiredRows(scan) ? true : false;;
+
+            region = env.getRegion();
+            emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+            emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+            byte[] txnScn = scan.getAttribute(BaseScannerRegionObserver.TX_SCN);
+            if (txnScn!=null) {
+                TimeRange timeRange = scan.getTimeRange();
+                scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+            }
+            minTimestamp = scan.getTimeRange().getMin();
+            maxTimestamp = scan.getTimeRange().getMax();
+            now = maxTimestamp != HConstants.LATEST_TIMESTAMP ? maxTimestamp : EnvironmentEdgeManager.currentTimeMillis();
+        }
+
+        @Override
+        public int getBatch() {
+            return scanner.getBatch();
+        }
+
+        @Override
+        public long getMaxResultSize() {
+            return scanner.getMaxResultSize();
+        }
+
+        @Override
+        public boolean next(List<Cell> result) throws IOException {
+            try {
+                boolean hasMore;
+                do {
+                    hasMore = scanner.next(result);
+                    if (result.isEmpty()) {
+                        break;
+                    }
+
+                    if (maskIfExpired && checkRowNotExpired(result)) {
+                        break;
+                    }
+
+                    if (deleteIfExpired && deleteRowIfExpired(result)) {
+                        break;
+                    }
+                    // skip this row
+                    // 1. if the row has expired (checkRowNotExpired returned false)
+                    // 2. if the row was not deleted (deleteRowIfExpired returned false and
+                    //  do not want it to count towards the deleted count)
+                    result.clear();
+                } while (hasMore);
+                return hasMore;
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                return false; // impossible
+            }
+        }
+
+        @Override
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+            throw new IOException("next with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+            throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override
+        public void close() throws IOException {
+            scanner.close();
+        }
+
+        @Override
+        public HRegionInfo getRegionInfo() {
+            return scanner.getRegionInfo();
+        }
+
+        @Override
+        public boolean isFilterDone() throws IOException {
+            return scanner.isFilterDone();
+        }
+
+        @Override
+        public boolean reseek(byte[] row) throws IOException {
+            return scanner.reseek(row);
+        }
+
+        @Override
+        public long getMvccReadPoint() {
+            return scanner.getMvccReadPoint();
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result) throws IOException {
+            try {
+                boolean hasMore;
+                do {
+                    hasMore = scanner.nextRaw(result);
+                    if (result.isEmpty()) {
+                        break;
+                    }
+                    if (maskIfExpired && checkRowNotExpired(result)) {
+                        break;
+                    }
+
+                    if (deleteIfExpired && deleteRowIfExpired(result)) {
+                        break;
+                    }
+                    // skip this row
+                    // 1. if the row has expired (checkRowNotExpired returned false)
+                    // 2. if the row was not deleted (deleteRowIfExpired returned false and
+                    //  do not want it to count towards the deleted count)
+                    result.clear();
+                } while (hasMore);
+                return hasMore;
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                return false; // impossible
+            }
+        }
+
+        /**
+         * @param cellList is an input and output parameter and will either include a valid row or be an empty list
+         * @return true if row expired and deleted or empty, otherwise false
+         * @throws IOException
+         */
+        private boolean deleteRowIfExpired(List<Cell> cellList) throws IOException {
+
+            long cellListSize = cellList.size();
+            if (cellListSize == 0) {
+                return true;
+            }
+
+            Iterator<Cell> cellIterator = cellList.iterator();
+            Cell firstCell = cellIterator.next();
+            byte[] rowKey = new byte[firstCell.getRowLength()];
+            System.arraycopy(firstCell.getRowArray(), firstCell.getRowOffset(), rowKey, 0, firstCell.getRowLength());
+
+            boolean isRowExpired = !checkRowNotExpired(cellList);
+            if (isRowExpired) {
+                long ttl = ScanUtil.getViewTTL(this.scan) ;
+                long ts = getMaxTimestamp(cellList);
+                LOG.debug(String.format("***** VIEW-TTL: Deleting region = %s, row = %s, delete-ts = %d, max-ts = %d ****** ",
+                        region.getRegionInfo().getTable().getNameAsString(),
+                        Bytes.toString(rowKey),
+                        now-ttl, ts));
+                Delete del = new Delete(rowKey, now-ttl);
+                Mutation[] mutations = new Mutation[]{del};
+                region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+                return true;
+            }
+            return false;
+        }
+
+
+        private boolean isEmptyColumn(Cell cell) {
+            return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                    emptyCF, 0, emptyCF.length) == 0 &&
+                    Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                            emptyCQ, 0, emptyCQ.length) == 0;
+        }
+
+        // TODO : Remove it after we verify all SQLs include the empty column.
+        // Before we added ScanUtil.addEmptyColumnToScan some queries like select count(*) did not include
+        // the empty column in scan, thus this method was the fallback in those cases.
+        private boolean checkEmptyColumnNotExpired(byte[] rowKey) throws IOException {
+            LOG.warn("Scan " + scan + " did not return the empty column for " + region.getRegionInfo().getTable().getNameAsString());
+            Get get = new Get(rowKey);
+            get.setTimeRange(minTimestamp, maxTimestamp);
+            get.addColumn(emptyCF, emptyCQ);
+            Result result = region.get(get);
+            if (result.isEmpty()) {
+                LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
+                return false;
+            }
+            return !isTTLExpired(result.getColumnLatestCell(emptyCF, emptyCQ));
+        }
+
+        /**
+         * @param cellList is an input and output parameter and will either include a valid row or be an empty list
+         * @return true if row not expired, otherwise false
+         * @throws IOException
+         */
+        private boolean checkRowNotExpired(List<Cell> cellList) throws IOException {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (isEmptyColumn(cell)) {
+                    LOG.debug(String.format("**********VIEW-TTL: Row expired for [%s], expired = %s ***************", cell.toString(), isTTLExpired(cell)));
+                    // Empty column is not supposed to be returned to the client except it is the only column included
+                    // in the scan
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return !isTTLExpired(cell);
+                }
+            }
+            byte[] rowKey = new byte[cell.getRowLength()];
+            System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+            return checkEmptyColumnNotExpired(rowKey);
+        }
+
+        private long getMaxTimestamp(List<Cell> cellList) {
+            long maxTs = 0;
+            long ts = 0;
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                Cell cell = cellIterator.next();
+                ts = cell.getTimestamp();
+                if (ts > maxTs) {
+                    maxTs = ts;
+                }
+            }
+            return maxTs;
+        }
+
+        private boolean isTTLExpired(Cell cell) {
+            long ts = cell.getTimestamp();
+            long ttl = ScanUtil.getViewTTL(this.scan) ;
+            if (ts + ttl < now) {
+                return true;
+            }
+            return false;
+        }
+    }
+
+
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         Scan scan, RegionScanner s) throws IOException {
+
+        if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) {
+            return s;
+        }
+
+        LOG.debug(String.format(
+                "********** VIEW-TTL: TTLAwareRegionObserver::postScannerOpen TTL for table = [%s], scan = [%s], VIEW_TTL = %d ***************",
+                s.getRegionInfo().getTable().getNameAsString(),
+                scan.toJSON(Integer.MAX_VALUE),
+                ScanUtil.getViewTTL(scan)));
+        return new TTLAwareRegionScanner(c.getEnvironment(), scan, s);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 69bf8c1..68779aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -133,7 +133,8 @@ public class TableResultIterator implements ResultIterator {
         this.caches = caches;
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
-        IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+        ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+        ScanUtil.setScanAttributesForViewTTL(scan, table, plan.getContext().getConnection());
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4c1e61c..a8239d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -158,6 +158,7 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.TTLAwareRegionObserver;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
@@ -1066,6 +1067,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     }
                 }
             }
+
+            // The priority for this co-processor should be set higher than the GlobalIndexChecker so that the read repair scans
+            // are intercepted by the TTLAwareRegionObserver and only the rows that are not ttl-expired are returned.
+            if (!SchemaUtil.isSystemTable(tableName)) {
+                if (!descriptor.hasCoprocessor(TTLAwareRegionObserver.class.getName())) {
+                    descriptor.addCoprocessor(TTLAwareRegionObserver.class.getName(), null, priority-2, null);
+                }
+            }
+
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index aebe9fc..aa989fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -21,11 +21,9 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.PhoenixRuntime.getTable;
 
 import java.io.ByteArrayInputStream;
@@ -39,7 +37,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.cache.Cache;
@@ -60,8 +57,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -83,7 +78,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
@@ -91,9 +85,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
-import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -101,7 +92,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -141,6 +131,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import com.google.common.collect.Lists;
 
 public class IndexUtil {
+
     public static final String INDEX_COLUMN_NAME_SEP = ":";
     public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
 
@@ -919,86 +910,4 @@ public class IndexUtil {
             throw new IOException(e);
         }
     }
-
-    private static boolean containsOneOrMoreColumn(Scan scan) {
-        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
-        if (familyMap == null || familyMap.isEmpty()) {
-            return false;
-        }
-        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
-            NavigableSet<byte[]> family = entry.getValue();
-            if (family != null && !family.isEmpty()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
-        boolean addedEmptyColumn = false;
-        Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
-        while (iterator.hasNext()) {
-            Filter filter = iterator.next();
-            if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
-                ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
-                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                    scan.addColumn(emptyCF, emptyCQ);
-                }
-            }
-            else if (filter instanceof ColumnProjectionFilter) {
-                ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
-                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-                    scan.addColumn(emptyCF, emptyCQ);
-                }
-            }
-            else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
-                ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
-            }
-            else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
-                scan.addColumn(emptyCF, emptyCQ);
-                addedEmptyColumn = true;
-            }
-        }
-        if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
-            scan.addColumn(emptyCF, emptyCQ);
-        }
-    }
-
-    public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
-        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
-            return;
-        }
-        PTable indexTable = table;
-        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
-            return;
-        }
-        String schemaName = indexTable.getParentSchemaName().getString();
-        String tableName = indexTable.getParentTableName().getString();
-        PTable dataTable;
-        try {
-            dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
-        } catch (TableNotFoundException e) {
-            // This index table must be being deleted. No need to set the scan attributes
-            return;
-        }
-        if (!dataTable.getIndexes().contains(indexTable)) {
-            return;
-        }
-        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
-            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
-        }
-        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
-        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
-        IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
-        byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
-        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
-        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
-            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
-        }
-        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
-    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 28ea349..318a1d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -23,6 +23,8 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_AN
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -53,13 +56,19 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
@@ -70,8 +79,10 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -960,4 +971,140 @@ public class ScanUtil {
     public static void setClientVersion(Scan scan, int version) {
         scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version));
     }
+
+    public static long getViewTTL(Scan scan) {
+        byte[] viewTTL = scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL);
+        if (viewTTL == null) {
+            return 0L;
+        }
+        return Bytes.toLong(viewTTL);
+    }
+
+    public static boolean isMaskTTLExpiredRows(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED) != null &&
+                (Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED),
+                PDataType.TRUE_BYTES) == 0)
+                && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) != null;
+    }
+
+    public static boolean isDeleteTTLExpiredRows(Scan scan) {
+        return scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED) != null && (
+                Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED),
+                        PDataType.TRUE_BYTES) == 0)
+                && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) != null;
+    }
+
+    private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+        boolean addedEmptyColumn = false;
+        Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
+        while (iterator.hasNext()) {
+            Filter filter = iterator.next();
+            if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+                ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                    scan.addColumn(emptyCF, emptyCQ);
+                }
+            } else if (filter instanceof ColumnProjectionFilter) {
+                ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF),
+                        new ImmutableBytesPtr(emptyCQ));
+                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                    scan.addColumn(emptyCF, emptyCQ);
+                }
+            } else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
+                ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+            } else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
+                scan.addColumn(emptyCF, emptyCQ);
+                addedEmptyColumn = true;
+            }
+        }
+        if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+            scan.addColumn(emptyCF, emptyCQ);
+        }
+    }
+
+    private static boolean containsOneOrMoreColumn(Scan scan) {
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap == null || familyMap.isEmpty()) {
+            return false;
+        }
+        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            NavigableSet<byte[]> family = entry.getValue();
+            if (family != null && !family.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table,
+            PhoenixConnection phoenixConnection) throws SQLException {
+        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
+            return;
+        }
+
+        PTable indexTable = table;
+        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+            return;
+        }
+        String schemaName = indexTable.getParentSchemaName().getString();
+        String tableName = indexTable.getParentTableName().getString();
+        PTable dataTable;
+        try {
+            dataTable =
+                    PhoenixRuntime.getTable(phoenixConnection,
+                            SchemaUtil.getTableName(schemaName, tableName));
+        } catch (TableNotFoundException e) {
+            // This index table must be being deleted. No need to set the scan attributes
+            return;
+        }
+        // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child
+        // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work
+        if (table.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) {
+            int lastIndexOf = table.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+            String indexName = table.getName().getString().substring(lastIndexOf + 1);
+            indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
+        }
+
+        if (!dataTable.getIndexes().contains(indexTable)) {
+            return;
+        }
+        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable),
+                    phoenixConnection);
+            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+        }
+        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME,
+                dataTable.getPhysicalName().getBytes());
+        IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+        byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
+            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+        }
+        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+    }
+
+    public static void setScanAttributesForViewTTL(Scan scan, PTable table,
+            PhoenixConnection phoenixConnection) throws SQLException {
+
+        if (table.getViewTTL() != 0) {
+            byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table);
+            byte[]
+                    emptyColumnName =
+                    table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                            QueryConstants.EMPTY_COLUMN_BYTES :
+                            table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName);
+            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName);
+            scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED, PDataType.TRUE_BYTES);
+            scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL, Bytes.toBytes(Long.valueOf(table.getViewTTL())));
+            addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
+        }
+
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
index ccc378f..8158df9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
@@ -20,7 +20,11 @@ package org.apache.phoenix.query;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -31,9 +35,12 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Arrays.asList;
@@ -189,6 +196,41 @@ public class PhoenixTestBuilder {
         List<Object> getValues(int rowIndex);
     }
 
+    // A Data Reader to be used in tests to read test data from test db.
+    public interface DataReader {
+        // returns the columns that need to be projected during DML queries,
+        List<String> getValidationColumns();
+
+        void setValidationColumns(List<String> validationColumns);
+
+        // returns the columns that represent the pk/unique key for this data set,
+        List<String> getRowKeyColumns();
+
+        void setRowKeyColumns(List<String> rowKeyColumns);
+
+        // returns the connection to be used for DML queries.
+        Connection getConnection();
+
+        void setConnection(Connection connection);
+
+        // returns the target entity - whether to use the table, global-view, the tenant-view or
+        // an index table.
+        String getTargetEntity();
+
+        void setTargetEntity(String targetEntity);
+
+        // Build the DML statement and return the SQL string.
+        String getDML();
+
+        String setDML(String dmlStatement);
+
+        // template method to read a batch of rows using the above sql.
+        void readRows() throws SQLException;
+
+        // Get the data that was read as a Table.
+        Table<String, String, Object> getDataTable();
+    }
+
     // A Data Writer to be used in tests to upsert sample data (@see TestDataSupplier) into the sample schema.
     public interface DataWriter {
         // returns the columns that need to be upserted,
@@ -212,66 +254,142 @@ public class PhoenixTestBuilder {
 
         void setTargetEntity(String targetEntity);
 
+        // returns the columns that is set asthe pk/unique key for this data set,
+        List<String> getRowKeyColumns();
+
+        void setRowKeyColumns(List<String> rowKeyColumns);
+
         // return the data provider for this writer
         DataSupplier getTestDataSupplier();
 
-        // template method to upsert rows using the above info.
-        void upsertRow(int rowIndex) throws SQLException;
-
         void setDataSupplier(DataSupplier dataSupplier);
+
+        // template method to upsert a single row using the above info.
+        List<Object> upsertRow(int rowIndex) throws SQLException;
+
+        // template method to upsert a batch of rows using the above info.
+        void upsertRows(int numRows) throws SQLException;
+
+        // Get the data that was written as a Table
+        Table<String, String, Object> getDataTable();
     }
 
-    /**
-     * Test SchemaBuilder defaults.
-     */
-    public static class DDLDefaults {
-        public static final int MAX_ROWS = 10000;
-        public static List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)");
-        public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
-        public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+    // Provides template method for returning result set
+    public static abstract class AbstractDataReader implements DataReader {
+        Table<String, String, Object> dataTable = TreeBasedTable.create();
 
-        public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR");
-        public static List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3");
-        public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6");
-        public static List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9");
+        public Table<String, String, Object> getDataTable() {
+            return dataTable;
+        }
 
-        public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null);
-        public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null);
-        public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+        // Read batch of rows
+        public void readRows() throws SQLException {
+            dataTable.clear();
+            String sql = getDML();
+            Connection connection = getConnection();
+            try (Statement stmt = connection.createStatement()) {
+
+                final PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class);
+                ResultSet rs = pstmt.executeQuery(sql);
+                List<String> cols = getValidationColumns();
+                List<Object> values = Lists.newArrayList();
+                Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ?
+                        Sets.<String>newHashSet() :
+                        Sets.newHashSet(getRowKeyColumns());
+                List<String> rowKeyParts = Lists.newArrayList();
+                while (rs.next()) {
+                    for (String col : cols) {
+                        Object val = rs.getObject(col);
+                        values.add(val);
+                        if (rowKeys.isEmpty()) {
+                            rowKeyParts.add(val.toString());
+                        }
+                        else if (rowKeys.contains(col)) {
+                            rowKeyParts.add(val.toString());
+                        }
+                    }
 
-        public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
-        public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
-        public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+                    String rowKey = Joiner.on("-").join(rowKeyParts);
+                    for (int v = 0; v < values.size(); v++) {
+                        dataTable.put(rowKey,cols.get(v), values.get(v));
+                    }
+                    values.clear();
+                    rowKeyParts.clear();
+                }
+                LOGGER.info(String.format("########## rows: %d", dataTable.rowKeySet().size()));
 
-        public static List<String> TABLE_INDEX_COLUMNS = asList("COL1");
-        public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+            } catch (SQLException e) {
+                LOGGER.error(String.format(" Error [%s] initializing Reader. ",
+                        e.getMessage()));
+                throw e;
+            }
+        }
+    }
 
-        public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
-        public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6");
+    // An implementation of the DataReader.
+    public static class BasicDataReader extends AbstractDataReader {
 
-        public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
-        public static List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7");
+        Connection connection;
+        String targetEntity;
+        String dmlStatement;
+        List<String> validationColumns;
+        List<String> rowKeyColumns;
 
-        public static String
-                DEFAULT_TABLE_PROPS =
-                "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
-        public static String DEFAULT_TABLE_INDEX_PROPS = "";
-        public static String DEFAULT_GLOBAL_VIEW_PROPS = "";
-        public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
-        public static String DEFAULT_TENANT_VIEW_PROPS = "";
-        public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
-        public static String DEFAULT_KP = "0EC";
-        public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
-        public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s";
 
-        public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+        @Override public String getDML() {
+            return this.dmlStatement;
+        }
 
+        @Override public String setDML(String dmlStatement) {
+            return this.dmlStatement = dmlStatement;
+        }
+
+        // returns the columns that need to be projected during DML queries,
+        @Override public List<String> getValidationColumns() {
+            return this.validationColumns;
+        }
+
+        @Override public void setValidationColumns(List<String> validationColumns) {
+            this.validationColumns = validationColumns;
+        }
+
+        // returns the columns that is set as the pk/unique key for this data set,
+        @Override public  List<String> getRowKeyColumns() {
+            return this.rowKeyColumns;
+        }
+
+        @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+            this.rowKeyColumns = rowKeyColumns;
+        }
+
+        @Override public Connection getConnection() {
+            return connection;
+        }
+
+        @Override public void setConnection(Connection connection) {
+            this.connection = connection;
+        }
+
+        @Override public String getTargetEntity() {
+            return targetEntity;
+        }
+
+        @Override public void setTargetEntity(String targetEntity) {
+            this.targetEntity = targetEntity;
+        }
     }
 
+
     // Provides template method for upserting rows
     public static abstract class AbstractDataWriter implements DataWriter {
+        Table<String, String, Object> dataTable = TreeBasedTable.create();
 
-        public void upsertRow(int rowIndex) throws SQLException {
+        public Table<String, String, Object> getDataTable() {
+            return dataTable;
+        }
+
+        // Upsert one row.
+        public List<Object> upsertRow(int rowIndex) throws SQLException {
             List<String> upsertColumns = Lists.newArrayList();
             List<Object> upsertValues = Lists.newArrayList();
 
@@ -294,7 +412,6 @@ public class PhoenixTestBuilder {
                 buf.append("?,");
             }
             buf.setCharAt(buf.length() - 1, ')');
-
             LOGGER.info(buf.toString());
 
             Connection connection = getConnection();
@@ -305,16 +422,83 @@ public class PhoenixTestBuilder {
                 stmt.execute();
                 connection.commit();
             }
+            return upsertValues;
+        }
+
+        // Upsert batch of rows.
+        public void upsertRows(int numRows) throws SQLException {
+            dataTable.clear();
+            List<String> upsertColumns = Lists.newArrayList();
+            List<Integer> rowKeyPositions = Lists.newArrayList();
+
+            // Figure out the upsert columns based on whether this is a full or partial row update.
+            boolean isFullRowUpdate = getColumnPositionsToUpdate().isEmpty();
+            if (isFullRowUpdate) {
+                upsertColumns.addAll(getUpsertColumns());
+            } else {
+                List<String> tmpColumns = getUpsertColumns();
+                for (int i : getColumnPositionsToUpdate()) {
+                    upsertColumns.add(tmpColumns.get(i));
+                }
+            }
+
+            Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ?
+                    Sets.<String>newHashSet(getUpsertColumns()) :
+                    Sets.newHashSet(getRowKeyColumns());
+
+            StringBuilder buf = new StringBuilder("UPSERT INTO ");
+            buf.append(getTargetEntity());
+            buf.append(" (").append(Joiner.on(",").join(upsertColumns)).append(") VALUES(");
+            for (int i = 0; i < upsertColumns.size(); i++) {
+                buf.append("?,");
+                if (rowKeys.contains(upsertColumns.get(i))) {
+                    rowKeyPositions.add(i);
+                }
+            }
+            buf.setCharAt(buf.length() - 1, ')');
+            LOGGER.info(buf.toString());
+
+            Connection connection = getConnection();
+            try (PreparedStatement stmt = connection.prepareStatement(buf.toString())) {
+
+                for (int r = 1; r <= numRows; r++) {
+                    List<Object> upsertValues = Lists.newArrayList();
+                    if (isFullRowUpdate) {
+                        upsertValues.addAll(getTestDataSupplier().getValues(r));
+                    } else {
+                        List<Object> tmpValues = getTestDataSupplier().getValues(r);
+                        for (int c : getColumnPositionsToUpdate()) {
+                            upsertValues.add(tmpValues.get(c));
+                        }
+                    }
+
+                    List<String> rowKeyParts = Lists.newArrayList();
+                    for (int position : rowKeyPositions) {
+                        rowKeyParts.add(upsertValues.get(position).toString());
+                    }
+                    String rowKey = Joiner.on("-").join(rowKeyParts);
+
+                    for (int v = 0; v < upsertValues.size(); v++) {
+                        stmt.setObject(v + 1, upsertValues.get(v));
+                        dataTable.put(rowKey,upsertColumns.get(v), upsertValues.get(v));
+                    }
+                    stmt.execute();
+                }
+                connection.commit();
+            }
         }
+
     }
 
-    // An implementation of the TestDataWriter.
+    // An implementation of the DataWriter.
     public static class BasicDataWriter extends AbstractDataWriter {
         List<String> upsertColumns = Lists.newArrayList();
         List<Integer> columnPositionsToUpdate = Lists.newArrayList();
         DataSupplier dataSupplier;
         Connection connection;
         String targetEntity;
+        List<String> rowKeyColumns;
+
 
         @Override public List<String> getUpsertColumns() {
             return upsertColumns;
@@ -348,6 +532,15 @@ public class PhoenixTestBuilder {
             this.targetEntity = targetEntity;
         }
 
+        // returns the columns that is set as the pk/unique key for this data set,
+        @Override public  List<String> getRowKeyColumns() {
+            return this.rowKeyColumns;
+        }
+
+        @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+            this.rowKeyColumns = rowKeyColumns;
+        }
+
         @Override public DataSupplier getTestDataSupplier() {
             return dataSupplier;
         }
@@ -358,6 +551,53 @@ public class PhoenixTestBuilder {
     }
 
     /**
+     * Test SchemaBuilder defaults.
+     */
+    public static class DDLDefaults {
+        public static final int MAX_ROWS = 10000;
+        public static List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)");
+        public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
+        public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+
+        public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR");
+        public static List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3");
+        public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6");
+        public static List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9");
+
+        public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null);
+        public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+        public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null);
+
+        public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
+        public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
+        public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+
+        public static List<String> TABLE_INDEX_COLUMNS = asList("COL1");
+        public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+
+        public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
+        public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6");
+
+        public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
+        public static List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7");
+
+        public static String
+                DEFAULT_TABLE_PROPS =
+                "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
+        public static String DEFAULT_TABLE_INDEX_PROPS = "";
+        public static String DEFAULT_GLOBAL_VIEW_PROPS = "";
+        public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
+        public static String DEFAULT_TENANT_VIEW_PROPS = "";
+        public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
+        public static String DEFAULT_KP = "ECZ";
+        public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
+        public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s";
+
+        public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+
+    }
+
+    /**
      * Schema builder for test writers to prepare various test scenarios.
      * It can be used to define the following type of schemas -
      * 1. Simple Table.
@@ -741,20 +981,20 @@ public class PhoenixTestBuilder {
                 		"useTenantConnectionForGlobalView and useGlobalConnectionOnly both cannot be true");
             }
 
-            String tableName = SchemaUtil.getEscapedArgument("T_" + dataOptions.uniqueName);
-            String globalViewName = SchemaUtil.getEscapedArgument("V_" + dataOptions.uniqueName);
+            String tableName = SchemaUtil.normalizeIdentifier("T_" + dataOptions.uniqueName);
+            String globalViewName = SchemaUtil.normalizeIdentifier("V_" + dataOptions.uniqueName);
             String
                     tableSchemaName =
-                    tableEnabled ? SchemaUtil.getEscapedArgument(tableOptions.schemaName) : "";
+                    tableEnabled ? SchemaUtil.normalizeIdentifier(tableOptions.schemaName) : "";
             String
                     globalViewSchemaName =
                     globalViewEnabled ?
-                            SchemaUtil.getEscapedArgument(globalViewOptions.schemaName) :
+                            SchemaUtil.normalizeIdentifier(globalViewOptions.schemaName) :
                             "";
             String
                     tenantViewSchemaName =
                     tenantViewEnabled ?
-                            SchemaUtil.getEscapedArgument(tenantViewOptions.schemaName) :
+                            SchemaUtil.normalizeIdentifier(tenantViewOptions.schemaName) :
                             "";
             entityTableName = SchemaUtil.getTableName(tableSchemaName, tableName);
             entityGlobalViewName = SchemaUtil.getTableName(globalViewSchemaName, globalViewName);
@@ -767,7 +1007,7 @@ public class PhoenixTestBuilder {
                                     (String.format("Z%02d", dataOptions.getViewNumber())) :
                                     DDLDefaults.DEFAULT_KP);
 
-            String tenantViewName = SchemaUtil.getEscapedArgument(entityKeyPrefix);
+            String tenantViewName = SchemaUtil.normalizeIdentifier(entityKeyPrefix);
             entityTenantViewName = SchemaUtil.getTableName(tenantViewSchemaName, tenantViewName);
             String globalViewCondition = String.format("KP = '%s'", entityKeyPrefix);
 
@@ -787,7 +1027,7 @@ public class PhoenixTestBuilder {
                 if (tableIndexEnabled && !tableIndexCreated) {
                     String
                             indexOnTableName =
-                            SchemaUtil.getEscapedArgument(String.format("IDX_%s",
+                            SchemaUtil.normalizeIdentifier(String.format("IDX_%s",
                                     SchemaUtil.normalizeIdentifier(tableName)));
                     globalConnection.createStatement().execute(
                             buildCreateIndexStmt(indexOnTableName, entityTableName,