You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/01/11 01:53:04 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5501 Add support for VIEW_TTL table property during DDL.

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new ccddbe9  PHOENIX-5501 Add support for VIEW_TTL table property during DDL.
ccddbe9 is described below

commit ccddbe96ab197ec06fc57fde2ed2c22024c0757a
Author: Jacob Isaac <ji...@salesforce.com>
AuthorDate: Mon Oct 14 10:26:18 2019 -0700

    PHOENIX-5501 Add support for VIEW_TTL table property during DDL.
    
    Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 397 +++++++++++++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  31 +-
 .../coprocessor/generated/PTableProtos.java        | 288 ++++++++++++++-
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |  12 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   2 +
 .../org/apache/phoenix/schema/DelegateTable.java   |  10 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 114 +++++-
 .../java/org/apache/phoenix/schema/PTable.java     |  15 +
 .../java/org/apache/phoenix/schema/PTableImpl.java |  67 +++-
 .../org/apache/phoenix/schema/TableProperty.java   |  33 ++
 .../java/org/apache/phoenix/util/ViewUtil.java     |   9 +
 phoenix-protocol/src/main/PTable.proto             |   3 +
 13 files changed, 962 insertions(+), 23 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
new file mode 100644
index 0000000..ee77b33
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ViewTTLIT extends ParallelStatsDisabledIT {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class);
+    private static final String ORG_ID_FMT = "00D0x000%s";
+    private static final String ID_FMT = "00A0y000%07d";
+    private static final String VIEW_TTL_HEADER_SQL =  "SELECT VIEW_TTL FROM SYSTEM.CATALOG "
+            + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = '%s'";
+
+    private static final String ALTER_VIEW_TTL_SQL = "ALTER VIEW %s.%s set VIEW_TTL=%d";
+
+    // Scans the HBase rows directly for the view ttl related header rows column and asserts
+    private void assertViewHeaderRowsHaveViewTTLRelatedCells(String schemaName, long minTimestamp,
+            boolean rawScan, int expectedRows) throws IOException, SQLException {
+
+        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+        RowFilter schemaNameFilter = new RowFilter(
+                CompareFilter.CompareOp.EQUAL,
+                new SubstringComparator(schemaName)
+        );
+        QualifierFilter viewTTLQualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
+                new BinaryComparator(PhoenixDatabaseMetaData.VIEW_TTL_BYTES));
+        filterList.addFilter(schemaNameFilter);
+        filterList.addFilter(viewTTLQualifierFilter);
+        try (Table tbl = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
+                .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+
+            Scan allRows = new Scan();
+            allRows.setRaw(rawScan);
+            allRows.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP);
+            allRows.setFilter(filterList);
+            ResultScanner scanner = tbl.getScanner(allRows);
+            int numMatchingRows = 0;
+            for (Result result = scanner.next(); result != null; result = scanner.next()) {
+                numMatchingRows +=
+                        result.containsColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                                PhoenixDatabaseMetaData.VIEW_TTL_BYTES) ? 1 : 0;
+            }
+            assertEquals(String.format("Expected rows do not match for table = %s at timestamp %d",
+                    Bytes.toString(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES), minTimestamp), expectedRows, numMatchingRows);
+        }
+
+    }
+
+    private void assertSyscatHaveViewTTLRelatedColumns(String tenantId, String schemaName, String tableName, String tableType, long ttlValueExpected)
+            throws SQLException {
+
+        try (Connection connection = DriverManager.getConnection(getUrl())) {
+            Statement stmt = connection.createStatement();
+            String tenantClause = tenantId == null || tenantId.isEmpty() ? "TENANT_ID IS NULL" : String.format("TENANT_ID = '%s'", tenantId);
+            String sql = String.format(VIEW_TTL_HEADER_SQL, tenantClause, schemaName, tableName, tableType);
+            stmt.execute(sql);
+            ResultSet rs = stmt.getResultSet() ;
+            long actualTTLValueReturned = rs.next() ? rs.getLong(1) : 0;
+
+            assertEquals(String.format("Expected rows do not match for schema = %s, table = %s",
+                    schemaName, tableName), ttlValueExpected, actualTTLValueReturned);
+        }
+    }
+
+    private String stripQuotes(String name) {
+        return name.replace("\"", "");
+    }
+
+
+    /**
+     * -----------------
+     * Test methods
+     * -----------------
+     */
+
+    @Test
+    public void testWithBasicGlobalViewWithNoViewTTLDefined() throws Exception {
+
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableDefaults()
+                .withGlobalViewDefaults()
+                .build();
+
+        // Expected 2 rows - one for Table and GlobalView each.
+        // Since the VIEW_TTL property values are not being set, we expect the view header columns to show up in raw scans only.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 2);
+    }
+
+
+
+    @Test
+    public void testViewTTLWithTableLevelTTLFails() throws Exception {
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true,TTL=100");
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps("VIEW_TTL=1000");
+        try {
+            schemaBuilder
+                    .withTableOptions(tableOptions)
+                    .withTenantViewOptions(tenantViewOptions)
+                    .buildNewView();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_VIEW_TTL_FOR_TABLE_WITH_TTL.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testViewTTLWithViewIndexFails() throws Exception {
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        TenantViewIndexOptions tenantViewIndexOptions = TenantViewIndexOptions.withDefaults();
+        tenantViewIndexOptions.setIndexProps("VIEW_TTL=1000");
+        try {
+            schemaBuilder
+                    .withTableOptions(tableOptions)
+                    .withTenantViewDefaults()
+                    .withTenantViewIndexOptions(tenantViewIndexOptions)
+                    .buildNewView();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.VIEW_TTL_SUPPORTED_FOR_VIEWS_ONLY.getErrorCode(), e.getErrorCode());
+        }
+    }
+
+    @Test
+    public void testViewTTLForLevelOneView() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps("VIEW_TTL=1000");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withTenantViewIndexDefaults()
+                .buildNewView();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String indexOnTenantViewName = String.format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+        // Expected 2 rows - one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 2);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+
+    }
+
+    @Test
+    public void testViewTTLForLevelTwoView() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
+                globalViewOptions = PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps("VIEW_TTL=300000");
+
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions
+                globalViewIndexOptions =
+                PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults();
+        tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=1000");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions)
+                .withTenantViewIndexDefaults()
+                .buildWithNewTenant();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String indexOnGlobalViewName = String.format("IDX_%s", globalViewName);
+        String indexOnTenantViewName = String.format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+        // Expected 4 rows - one for GlobalView, one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 4);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, indexOnGlobalViewName, PTableType.INDEX.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+
+        // Without override
+        startTime = System.currentTimeMillis();
+
+        TenantViewOptions tenantViewWithoutOverrideOptions = TenantViewOptions.withDefaults();
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+                .withTenantViewIndexDefaults()
+                .buildWithNewTenant();
+
+        tenantId = schemaBuilder.getDataOptions().getTenantId();
+        schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        indexOnGlobalViewName = String.format("IDX_%s", globalViewName);
+        indexOnTenantViewName = String.format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+
+        // Expected 2 rows - one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 2);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, indexOnGlobalViewName, PTableType.INDEX.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be same as the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 300000);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 300000);
+    }
+
+    @Test
+    public void testViewTTLForWhenTTLIsZero() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        // Client can also specify VIEW_TTL=NONE
+        tenantViewOptions.setTableProps("VIEW_TTL=0");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withTenantViewIndexDefaults()
+                .buildNewView();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String indexOnTenantViewName = String.format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+        // Expected 3 deleted rows - one for Table, one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are not being set or being set to zero,
+        // we expect the view header columns to show up in raw scans only.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 3);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 0);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 0);
+
+    }
+
+    @Test
+    public void testViewTTLWithAlterView() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP)
+        // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        // Client can also specify VIEW_TTL=0
+        tenantViewOptions.setTableProps("VIEW_TTL=NONE");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withTenantViewIndexDefaults()
+                .buildNewView();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String indexOnTenantViewName = String.format("IDX_%s", stripQuotes(schemaBuilder.getEntityKeyPrefix()));
+
+        // Expected 3 deleted rows - one for Table, one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are not being set or being set to zero,
+        // we expect the view header columns to show up in raw scans only.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 3);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 0);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 0);
+
+        String tenantURL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+        try (Connection connection = DriverManager.getConnection(tenantURL)) {
+            Statement stmt = connection.createStatement();
+            String sql = String.format(ALTER_VIEW_TTL_SQL, schemaName, tenantViewName, 1000);
+            stmt.execute(sql);
+        }
+
+        // Expected 2 rows - one for TenantView and ViewIndex each.
+        // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too.
+        assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 2);
+        // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 1d5f99a..05c0b4d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -72,6 +72,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_HWM_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VIEW_TTL_HWM;
 import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
@@ -318,6 +322,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
     private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
     private static final KeyValue USE_STATS_FOR_PARALLELIZATION_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, USE_STATS_FOR_PARALLELIZATION_BYTES);
+    private static final KeyValue VIEW_TTL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TTL_BYTES);
+    private static final KeyValue VIEW_TTL_HWM_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TTL_HWM_BYTES);
 
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -349,7 +355,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             APPEND_ONLY_SCHEMA_KV,
             STORAGE_SCHEME_KV,
             ENCODING_SCHEME_KV,
-            USE_STATS_FOR_PARALLELIZATION_KV
+            USE_STATS_FOR_PARALLELIZATION_KV,
+            VIEW_TTL_KV,
+            VIEW_TTL_HWM_KV
     );
 
     static {
@@ -385,6 +393,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
     private static final int QUALIFIER_ENCODING_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
     private static final int USE_STATS_FOR_PARALLELIZATION_INDEX = TABLE_KV_COLUMNS.indexOf(USE_STATS_FOR_PARALLELIZATION_KV);
+    private static final int VIEW_TTL_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_TTL_KV);
+    private static final int VIEW_TTL_HWM_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_TTL_HWM_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -1121,6 +1131,22 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Cell useStatsForParallelizationKv = tableKeyValues[USE_STATS_FOR_PARALLELIZATION_INDEX];
         Boolean useStatsForParallelization = useStatsForParallelizationKv == null ? null : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(useStatsForParallelizationKv.getValueArray(), useStatsForParallelizationKv.getValueOffset(), useStatsForParallelizationKv.getValueLength()));
 
+        Cell viewTTLKv = tableKeyValues[VIEW_TTL_INDEX];
+        long viewTTL = viewTTLKv == null ? VIEW_TTL_NOT_DEFINED :
+                PLong.INSTANCE.getCodec().decodeLong(viewTTLKv.getValueArray(),
+                        viewTTLKv.getValueOffset(), SortOrder.getDefault());
+
+        Cell viewTTLHWMKv = tableKeyValues[VIEW_TTL_HWM_INDEX];
+        long viewTTLHWM = viewTTLHWMKv == null ? MIN_VIEW_TTL_HWM :
+                PLong.INSTANCE.getCodec().decodeLong(viewTTLHWMKv.getValueArray(),
+                        viewTTLHWMKv.getValueOffset(), SortOrder.getDefault());
+
+        // Check the cell tag to see whether the view has modified this property
+        final byte[] tagViewTTL = (viewTTLKv == null) ?
+                HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(viewTTLKv);
+        boolean viewModifiedViewTTL = (PTableType.VIEW.equals(tableType)) &&
+                Bytes.contains(tagViewTTL, VIEW_MODIFIED_PROPERTY_BYTES);
+
         // Check the cell tag to see whether the view has modified this property
         final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY : CellUtil.getTagArray(useStatsForParallelizationKv);
@@ -1196,6 +1222,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 .setBaseColumnCount(baseColumnCount)
                 .setEncodedCQCounter(cqCounter)
                 .setUseStatsForParallelization(useStatsForParallelization)
+                .setViewTTL(viewTTL)
+                .setViewTTLHighWaterMark(viewTTLHWM)
                 .setExcludedColumns(ImmutableList.<PColumn>of())
                 .setTenantId(tenantId)
                 .setSchemaName(schemaName)
@@ -1211,6 +1239,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         ImmutableList.<PName>of() : ImmutableList.copyOf(physicalTables))
                 .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
                 .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
+                .setViewModifiedViewTTL(viewModifiedViewTTL)
                 .setColumns(columns)
                 .build();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 6ffedeb..2d14171 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3724,6 +3724,36 @@ public final class PTableProtos {
      * <code>optional bool viewModifiedUseStatsForParallelization = 41;</code>
      */
     boolean getViewModifiedUseStatsForParallelization();
+
+    // optional int64 viewTTL = 42;
+    /**
+     * <code>optional int64 viewTTL = 42;</code>
+     */
+    boolean hasViewTTL();
+    /**
+     * <code>optional int64 viewTTL = 42;</code>
+     */
+    long getViewTTL();
+
+    // optional int64 viewTTLHighWaterMark = 43;
+    /**
+     * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+     */
+    boolean hasViewTTLHighWaterMark();
+    /**
+     * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+     */
+    long getViewTTLHighWaterMark();
+
+    // optional bool viewModifiedViewTTL = 44;
+    /**
+     * <code>optional bool viewModifiedViewTTL = 44;</code>
+     */
+    boolean hasViewModifiedViewTTL();
+    /**
+     * <code>optional bool viewModifiedViewTTL = 44;</code>
+     */
+    boolean getViewModifiedViewTTL();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3995,6 +4025,21 @@ public final class PTableProtos {
               viewModifiedUseStatsForParallelization_ = input.readBool();
               break;
             }
+            case 336: {
+              bitField1_ |= 0x00000010;
+              viewTTL_ = input.readInt64();
+              break;
+            }
+            case 344: {
+              bitField1_ |= 0x00000020;
+              viewTTLHighWaterMark_ = input.readInt64();
+              break;
+            }
+            case 352: {
+              bitField1_ |= 0x00000040;
+              viewModifiedViewTTL_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4823,6 +4868,54 @@ public final class PTableProtos {
       return viewModifiedUseStatsForParallelization_;
     }
 
+    // optional int64 viewTTL = 42;
+    public static final int VIEWTTL_FIELD_NUMBER = 42;
+    private long viewTTL_;
+    /**
+     * <code>optional int64 viewTTL = 42;</code>
+     */
+    public boolean hasViewTTL() {
+      return ((bitField1_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int64 viewTTL = 42;</code>
+     */
+    public long getViewTTL() {
+      return viewTTL_;
+    }
+
+    // optional int64 viewTTLHighWaterMark = 43;
+    public static final int VIEWTTLHIGHWATERMARK_FIELD_NUMBER = 43;
+    private long viewTTLHighWaterMark_;
+    /**
+     * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+     */
+    public boolean hasViewTTLHighWaterMark() {
+      return ((bitField1_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+     */
+    public long getViewTTLHighWaterMark() {
+      return viewTTLHighWaterMark_;
+    }
+
+    // optional bool viewModifiedViewTTL = 44;
+    public static final int VIEWMODIFIEDVIEWTTL_FIELD_NUMBER = 44;
+    private boolean viewModifiedViewTTL_;
+    /**
+     * <code>optional bool viewModifiedViewTTL = 44;</code>
+     */
+    public boolean hasViewModifiedViewTTL() {
+      return ((bitField1_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional bool viewModifiedViewTTL = 44;</code>
+     */
+    public boolean getViewModifiedViewTTL() {
+      return viewModifiedViewTTL_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4864,6 +4957,9 @@ public final class PTableProtos {
       viewIndexIdType_ = 5;
       viewModifiedUpdateCacheFrequency_ = false;
       viewModifiedUseStatsForParallelization_ = false;
+      viewTTL_ = 0L;
+      viewTTLHighWaterMark_ = 0L;
+      viewModifiedViewTTL_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5051,6 +5147,15 @@ public final class PTableProtos {
       if (((bitField1_ & 0x00000008) == 0x00000008)) {
         output.writeBool(41, viewModifiedUseStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(42, viewTTL_);
+      }
+      if (((bitField1_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(43, viewTTLHighWaterMark_);
+      }
+      if (((bitField1_ & 0x00000040) == 0x00000040)) {
+        output.writeBool(44, viewModifiedViewTTL_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5225,6 +5330,18 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(41, viewModifiedUseStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(42, viewTTL_);
+      }
+      if (((bitField1_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(43, viewTTLHighWaterMark_);
+      }
+      if (((bitField1_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(44, viewModifiedViewTTL_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5436,6 +5553,21 @@ public final class PTableProtos {
         result = result && (getViewModifiedUseStatsForParallelization()
             == other.getViewModifiedUseStatsForParallelization());
       }
+      result = result && (hasViewTTL() == other.hasViewTTL());
+      if (hasViewTTL()) {
+        result = result && (getViewTTL()
+            == other.getViewTTL());
+      }
+      result = result && (hasViewTTLHighWaterMark() == other.hasViewTTLHighWaterMark());
+      if (hasViewTTLHighWaterMark()) {
+        result = result && (getViewTTLHighWaterMark()
+            == other.getViewTTLHighWaterMark());
+      }
+      result = result && (hasViewModifiedViewTTL() == other.hasViewModifiedViewTTL());
+      if (hasViewModifiedViewTTL()) {
+        result = result && (getViewModifiedViewTTL()
+            == other.getViewModifiedViewTTL());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5609,6 +5741,18 @@ public final class PTableProtos {
         hash = (37 * hash) + VIEWMODIFIEDUSESTATSFORPARALLELIZATION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getViewModifiedUseStatsForParallelization());
       }
+      if (hasViewTTL()) {
+        hash = (37 * hash) + VIEWTTL_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getViewTTL());
+      }
+      if (hasViewTTLHighWaterMark()) {
+        hash = (37 * hash) + VIEWTTLHIGHWATERMARK_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getViewTTLHighWaterMark());
+      }
+      if (hasViewModifiedViewTTL()) {
+        hash = (37 * hash) + VIEWMODIFIEDVIEWTTL_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getViewModifiedViewTTL());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5813,6 +5957,12 @@ public final class PTableProtos {
         bitField1_ = (bitField1_ & ~0x00000040);
         viewModifiedUseStatsForParallelization_ = false;
         bitField1_ = (bitField1_ & ~0x00000080);
+        viewTTL_ = 0L;
+        bitField1_ = (bitField1_ & ~0x00000100);
+        viewTTLHighWaterMark_ = 0L;
+        bitField1_ = (bitField1_ & ~0x00000200);
+        viewModifiedViewTTL_ = false;
+        bitField1_ = (bitField1_ & ~0x00000400);
         return this;
       }
 
@@ -6019,6 +6169,18 @@ public final class PTableProtos {
           to_bitField1_ |= 0x00000008;
         }
         result.viewModifiedUseStatsForParallelization_ = viewModifiedUseStatsForParallelization_;
+        if (((from_bitField1_ & 0x00000100) == 0x00000100)) {
+          to_bitField1_ |= 0x00000010;
+        }
+        result.viewTTL_ = viewTTL_;
+        if (((from_bitField1_ & 0x00000200) == 0x00000200)) {
+          to_bitField1_ |= 0x00000020;
+        }
+        result.viewTTLHighWaterMark_ = viewTTLHighWaterMark_;
+        if (((from_bitField1_ & 0x00000400) == 0x00000400)) {
+          to_bitField1_ |= 0x00000040;
+        }
+        result.viewModifiedViewTTL_ = viewModifiedViewTTL_;
         result.bitField0_ = to_bitField0_;
         result.bitField1_ = to_bitField1_;
         onBuilt();
@@ -6236,6 +6398,15 @@ public final class PTableProtos {
         if (other.hasViewModifiedUseStatsForParallelization()) {
           setViewModifiedUseStatsForParallelization(other.getViewModifiedUseStatsForParallelization());
         }
+        if (other.hasViewTTL()) {
+          setViewTTL(other.getViewTTL());
+        }
+        if (other.hasViewTTLHighWaterMark()) {
+          setViewTTLHighWaterMark(other.getViewTTLHighWaterMark());
+        }
+        if (other.hasViewModifiedViewTTL()) {
+          setViewModifiedViewTTL(other.getViewModifiedViewTTL());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8447,6 +8618,105 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int64 viewTTL = 42;
+      private long viewTTL_ ;
+      /**
+       * <code>optional int64 viewTTL = 42;</code>
+       */
+      public boolean hasViewTTL() {
+        return ((bitField1_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional int64 viewTTL = 42;</code>
+       */
+      public long getViewTTL() {
+        return viewTTL_;
+      }
+      /**
+       * <code>optional int64 viewTTL = 42;</code>
+       */
+      public Builder setViewTTL(long value) {
+        bitField1_ |= 0x00000100;
+        viewTTL_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 viewTTL = 42;</code>
+       */
+      public Builder clearViewTTL() {
+        bitField1_ = (bitField1_ & ~0x00000100);
+        viewTTL_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional int64 viewTTLHighWaterMark = 43;
+      private long viewTTLHighWaterMark_ ;
+      /**
+       * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+       */
+      public boolean hasViewTTLHighWaterMark() {
+        return ((bitField1_ & 0x00000200) == 0x00000200);
+      }
+      /**
+       * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+       */
+      public long getViewTTLHighWaterMark() {
+        return viewTTLHighWaterMark_;
+      }
+      /**
+       * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+       */
+      public Builder setViewTTLHighWaterMark(long value) {
+        bitField1_ |= 0x00000200;
+        viewTTLHighWaterMark_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 viewTTLHighWaterMark = 43;</code>
+       */
+      public Builder clearViewTTLHighWaterMark() {
+        bitField1_ = (bitField1_ & ~0x00000200);
+        viewTTLHighWaterMark_ = 0L;
+        onChanged();
+        return this;
+      }
+
+      // optional bool viewModifiedViewTTL = 44;
+      private boolean viewModifiedViewTTL_ ;
+      /**
+       * <code>optional bool viewModifiedViewTTL = 44;</code>
+       */
+      public boolean hasViewModifiedViewTTL() {
+        return ((bitField1_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <code>optional bool viewModifiedViewTTL = 44;</code>
+       */
+      public boolean getViewModifiedViewTTL() {
+        return viewModifiedViewTTL_;
+      }
+      /**
+       * <code>optional bool viewModifiedViewTTL = 44;</code>
+       */
+      public Builder setViewModifiedViewTTL(boolean value) {
+        bitField1_ |= 0x00000400;
+        viewModifiedViewTTL_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool viewModifiedViewTTL = 44;</code>
+       */
+      public Builder clearViewModifiedViewTTL() {
+        bitField1_ = (bitField1_ & ~0x00000400);
+        viewModifiedViewTTL_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -9116,7 +9386,7 @@ public final class PTableProtos {
       "es\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n" +
       "\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030" +
       "\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts" +
-      "\"\243\010\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
+      "\"\357\010\n\006PTable\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016" +
       "tableNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162" +
       "\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequ" +
       "enceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013p" +
@@ -9142,12 +9412,14 @@ public final class PTableProtos {
       "ransactionProvider\030& \001(\005\022\032\n\017viewIndexIdT" +
       "ype\030\' \001(\005:\0015\022(\n viewModifiedUpdateCacheF" +
       "requency\030( \001(\010\022.\n&viewModifiedUseStatsFo",
-      "rParallelization\030) \001(\010\"6\n\020EncodedCQCount" +
-      "er\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A" +
-      "\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VI" +
-      "EW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache." +
-      "phoenix.coprocessor.generatedB\014PTablePro" +
-      "tosH\001\210\001\001\240\001\001"
+      "rParallelization\030) \001(\010\022\017\n\007viewTTL\030* \001(\003\022" +
+      "\034\n\024viewTTLHighWaterMark\030+ \001(\003\022\033\n\023viewMod" +
+      "ifiedViewTTL\030, \001(\010\"6\n\020EncodedCQCounter\022\021" +
+      "\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 \002(\005*A\n\nPT" +
+      "ableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002" +
+      "\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoe" +
+      "nix.coprocessor.generatedB\014PTableProtosH" +
+      "\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9171,7 +9443,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisable [...]
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9edd8b3..2f072a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -324,6 +324,10 @@ public enum SQLExceptionCode {
             + MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES.toString()),
     CANNOT_SET_OR_ALTER_UPDATE_CACHE_FREQ_FOR_INDEX(10950, "44A31", "Cannot set or alter "
             + PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " on an index"),
+    VIEW_TTL_SUPPORTED_FOR_VIEWS_ONLY(10951, "44A32", PhoenixDatabaseMetaData.VIEW_TTL
+            + " property can only be set for views"),
+    CANNOT_SET_OR_ALTER_VIEW_TTL_FOR_TABLE_WITH_TTL(10952, "44A33", "Cannot set or alter "
+            + PhoenixDatabaseMetaData.VIEW_TTL + " property on an table with TTL,"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 56e052d..2e2d30d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -366,7 +366,17 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
     public static final String USE_STATS_FOR_PARALLELIZATION = "USE_STATS_FOR_PARALLELIZATION";
     public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
-    
+
+    // The view ttl property will hold the duration after which rows will be marked as expired.
+    public static final long VIEW_TTL_NOT_DEFINED = 0L;
+    public static final String VIEW_TTL = "VIEW_TTL";
+    public static final byte[] VIEW_TTL_BYTES = Bytes.toBytes(VIEW_TTL);
+    // The view ttl high watermark if set indicates the timestamp used for determining the expired rows.
+    // otherwise the now() - ttl-duration is the timestamp used.
+    public static final long MIN_VIEW_TTL_HWM = 0L;
+    public static final String VIEW_TTL_HWM = "VIEW_TTL_HWM";
+    public static final byte[] VIEW_TTL_HWM_BYTES = Bytes.toBytes(VIEW_TTL_HWM);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 1b04706..29d5842 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -198,6 +198,8 @@ public interface QueryConstants {
             VIEW_TYPE + " UNSIGNED_TINYINT,\n" +
             VIEW_INDEX_ID + " BIGINT,\n" +
             VIEW_INDEX_ID_DATA_TYPE + " INTEGER,\n" +
+            VIEW_TTL + " BIGINT,\n" +
+            VIEW_TTL_HWM + " BIGINT,\n" +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 34a973f..3ce19dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -340,4 +340,14 @@ public class DelegateTable implements PTable {
     @Override public boolean hasViewModifiedUseStatsForParallelization() {
         return delegate.hasViewModifiedUseStatsForParallelization();
     }
+
+    @Override public long getViewTTL() { return delegate.getViewTTL(); }
+
+    @Override public long getViewTTLHighWaterMark() {
+        return delegate.getViewTTLHighWaterMark();
+    }
+
+    @Override public boolean hasViewModifiedViewTTL() {
+        return delegate.hasViewModifiedViewTTL();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 75b86c3..4d61be4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -93,6 +93,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_HWM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VIEW_TTL_HWM;
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
@@ -309,8 +313,10 @@ public class MetaDataClient {
                     IMMUTABLE_STORAGE_SCHEME + "," +
                     ENCODING_SCHEME + "," +
                     USE_STATS_FOR_PARALLELIZATION +"," +
-                    VIEW_INDEX_ID_DATA_TYPE +
-                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                    VIEW_INDEX_ID_DATA_TYPE +"," +
+                    VIEW_TTL +"," +
+                    VIEW_TTL_HWM +
+                    ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -530,7 +536,7 @@ public class MetaDataClient {
 
     /**
      * Update the cache with the latest as of the connection scn.
-     * @param functioNames
+     * @param functionNames
      * @return the timestamp from the server, negative if the function was added to the cache and positive otherwise
      * @throws SQLException
      */
@@ -1973,10 +1979,46 @@ public class MetaDataClient {
             int baseTableColumnCount =
                     tableType == PTableType.VIEW ? parent.getColumns().size()
                             : QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+
+            Long viewTTL = VIEW_TTL_NOT_DEFINED;
+            Long viewTTLHighWaterMark = MIN_VIEW_TTL_HWM;
+            Long viewTTLProp = (Long) TableProperty.VIEW_TTL.getValue(tableProps);;
+            // Validate VIEW_TTL prop value if set
+            if (viewTTLProp != null) {
+                if (viewTTLProp < 0) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
+                            .setMessage(String.format("view = %s, VIEW_TTL value should be > 0", tableName ))
+                            .build()
+                            .buildException();
+                }
+
+                if (tableType == VIEW  && parentPhysicalName != null) {
+                    HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(parentPhysicalName.getBytes());
+                    if (desc != null) {
+                        Integer tableTTLProp = desc.getFamily(SchemaUtil.getEmptyColumnFamily(parent)).getTimeToLive();
+                        if ((tableTTLProp != null) && (tableTTLProp != HConstants.FOREVER)) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_OR_ALTER_VIEW_TTL_FOR_TABLE_WITH_TTL)
+                                    .setMessage(String.format("table = %s, view = %s", parentPhysicalName, tableName ))
+                                    .build()
+                                    .buildException();
+                        }
+                    }
+                }
+
+                if (tableType != VIEW) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.VIEW_TTL_SUPPORTED_FOR_VIEWS_ONLY)
+                            .setSchemaName(schemaName)
+                            .setTableName(tableName)
+                            .build()
+                            .buildException();
+                }
+            }
+
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
                 isImmutableRows = parent.isImmutableRows();
                 isAppendOnlySchema = parent.isAppendOnlySchema();
+                viewTTL = parent.getViewTTL();
 
                 // Index on view
                 // TODO: Can we support a multi-tenant index directly on a multi-tenant
@@ -2108,6 +2150,7 @@ public class MetaDataClient {
             if (tableType != PTableType.INDEX && updateCacheFrequencyProp != null) {
                 updateCacheFrequency = updateCacheFrequencyProp;
             }
+
             String autoPartitionSeq = (String) TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps);
             Long guidePostsWidth = (Long) TableProperty.GUIDE_POSTS_WIDTH.getValue(tableProps);
 
@@ -2288,6 +2331,9 @@ public class MetaDataClient {
                         // set to the parent value if the property is not set on the view
                         updateCacheFrequency = parent.getUpdateCacheFrequency();
                     }
+
+                    viewTTL = viewTTLProp == null ? parent.getViewTTL() : viewTTLProp;
+
                     disableWAL = (disableWALProp == null ? parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
                     // TODO PHOENIX-4766 Add an options to stop sending parent metadata when creating views
@@ -2646,6 +2692,8 @@ public class MetaDataClient {
                         .setIndexes(Collections.<PTable>emptyList())
                         .setPhysicalNames(ImmutableList.<PName>of())
                         .setColumns(columns.values())
+                        .setViewTTL(VIEW_TTL_NOT_DEFINED)
+                        .setViewTTLHighWaterMark(MIN_VIEW_TTL_HWM)
                         .build();
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
@@ -2835,6 +2883,15 @@ public class MetaDataClient {
                 tableUpsert.setBoolean(28, useStatsForParallelizationProp);
             }
             tableUpsert.setInt(29, viewIndexIdType.getSqlType());
+            if (viewTTL == null || viewTTL == VIEW_TTL_NOT_DEFINED) {
+                tableUpsert.setNull(30, Types.BIGINT);
+                tableUpsert.setNull(31, Types.BIGINT);
+            }
+            else {
+                tableUpsert.setLong(30, viewTTL);
+                tableUpsert.setLong(31, viewTTLHighWaterMark);
+            }
+
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -2988,6 +3045,8 @@ public class MetaDataClient {
                         .setPhysicalNames(physicalNames == null ?
                                 ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames))
                         .setColumns(columns.values())
+                        .setViewTTL(viewTTL == null ? VIEW_TTL_NOT_DEFINED : viewTTL)
+                        .setViewTTLHighWaterMark(viewTTLHighWaterMark == null ? MIN_VIEW_TTL_HWM : viewTTLHighWaterMark)
                         .setViewModifiedUpdateCacheFrequency(tableType ==  PTableType.VIEW &&
                                 parent != null &&
                                 parent.getUpdateCacheFrequency() != updateCacheFrequency)
@@ -2995,6 +3054,9 @@ public class MetaDataClient {
                                 parent != null &&
                                 parent.useStatsForParallelization()
                                         != useStatsForParallelizationProp)
+                        .setViewModifiedViewTTL(tableType ==  PTableType.VIEW &&
+                                parent != null && viewTTL != null &&
+                                parent.getViewTTL() != viewTTL)
                         .build();
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
@@ -3334,18 +3396,19 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.getGuidePostWidth(),
                 metaPropertiesEvaluated.getAppendOnlySchema(),
                 metaPropertiesEvaluated.getImmutableStorageScheme(),
-                metaPropertiesEvaluated.getUseStatsForParallelization());
+                metaPropertiesEvaluated.getUseStatsForParallelization(),
+                metaPropertiesEvaluated.getViewTTL());
     }
 
-    private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) throws SQLException {
-        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, updateCacheFrequency, null, null, null, null, -1L, null, null, null);
+    private  long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency, Long viewTTL) throws SQLException {
+        return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, updateCacheFrequency, null, null, null, null, -1L, null, null, null,viewTTL);
     }
 
     private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
             Boolean isTransactional, TransactionFactory.Provider transactionProvider,
             Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL,
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema,
-            ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization)
+            ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization, Long viewTTL)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
         String tableName = table.getTableName().getString();
@@ -3398,6 +3461,9 @@ public class MetaDataClient {
         if (useStatsForParallelization != null) {
             mutateBooleanProperty(tenantId, schemaName, tableName, USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
         }
+        if (viewTTL != null) {
+            mutateLongProperty(tenantId, schemaName, tableName, VIEW_TTL, viewTTL);
+        }
         return seqNum;
     }
 
@@ -3719,11 +3785,12 @@ public class MetaDataClient {
 
                 if (!table.getIndexes().isEmpty() &&
                         (numPkColumnsAdded>0 || metaProperties.getNonTxToTx() ||
-                                metaPropertiesEvaluated.getUpdateCacheFrequency() != null)) {
+                                metaPropertiesEvaluated.getUpdateCacheFrequency() != null || metaPropertiesEvaluated.getViewTTL() != null)) {
                     for (PTable index : table.getIndexes()) {
                         incrementTableSeqNum(index, index.getType(), numPkColumnsAdded,
                                 metaProperties.getNonTxToTx() ? Boolean.TRUE : null,
-                                metaPropertiesEvaluated.getUpdateCacheFrequency());
+                                metaPropertiesEvaluated.getUpdateCacheFrequency(),
+                                metaPropertiesEvaluated.getViewTTL());
                     }
                     tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
@@ -4094,7 +4161,7 @@ public class MetaDataClient {
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {
-                        long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null);
+                        long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null, null);
                         dropColumnMutations(index, indexColumnsToDrop);
                         long clientTimestamp = MutationState.getTableTimestamp(timeStamp, connection.getSCN());
                         connection.removeColumn(tenantId, index.getName().getString(),
@@ -4105,7 +4172,7 @@ public class MetaDataClient {
                 tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
 
-                long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null, null);
+                long seqNum = incrementTableSeqNum(table, statement.getTableType(), -tableColumnsToDrop.size(), null, null, null);
                 tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                 connection.rollback();
                 // Force table header to be first in list
@@ -4775,6 +4842,8 @@ public class MetaDataClient {
                         metaProperties.setImmutableStorageSchemeProp((ImmutableStorageScheme)value);
                     } else if (propName.equalsIgnoreCase(USE_STATS_FOR_PARALLELIZATION)) {
                         metaProperties.setUseStatsForParallelizationProp((Boolean)value);
+                    } else if (propName.equalsIgnoreCase(VIEW_TTL)) {
+                        metaProperties.setViewTTL((Long)value);
                     }
                 }
                 // if removeTableProps is true only add the property if it is not a HTable or Phoenix Table property
@@ -4914,6 +4983,20 @@ public class MetaDataClient {
                 metaProperties.setNonTxToTx(true);
             }
         }
+
+        if (metaProperties.getViewTTL() != null) {
+            if (table.getType() != PTableType.VIEW) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.VIEW_TTL_SUPPORTED_FOR_VIEWS_ONLY)
+                        .build()
+                        .buildException();
+            }
+            if (metaProperties.getViewTTL().longValue() != table.getViewTTL()) {
+                metaPropertiesEvaluated.setViewTTL(metaProperties.getViewTTL());
+                changingPhoenixTableProperty = true;
+            }
+        }
+
         return changingPhoenixTableProperty;
     }
 
@@ -4930,6 +5013,7 @@ public class MetaDataClient {
         private ImmutableStorageScheme immutableStorageSchemeProp = null;
         private Boolean useStatsForParallelizationProp = null;
         private boolean nonTxToTx = false;
+        private Long viewTTL = null;
 
         public Boolean getImmutableRowsProp() {
             return isImmutableRowsProp;
@@ -5027,6 +5111,10 @@ public class MetaDataClient {
         public void setNonTxToTx(boolean nonTxToTx) {
             this.nonTxToTx = nonTxToTx;
         }
+
+        public Long getViewTTL() { return viewTTL; }
+
+        public void setViewTTL(Long viewTTL) { this.viewTTL = viewTTL; }
     }
 
     class MetaPropertiesEvaluated{
@@ -5041,6 +5129,7 @@ public class MetaDataClient {
         private Boolean useStatsForParallelization = null;
         private Boolean isTransactional = null;
         private TransactionFactory.Provider transactionProvider = null;
+        private Long viewTTL = null;
 
         public Boolean getIsImmutableRows() {
             return isImmutableRows;
@@ -5130,5 +5219,8 @@ public class MetaDataClient {
             this.transactionProvider = transactionProvider;
         }
 
+        public Long getViewTTL() { return viewTTL; }
+
+        public void setViewTTL(Long viewTTL) { this.viewTTL = viewTTL; }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 6c4d3a3..c3461a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -802,6 +802,21 @@ public interface PTable extends PMetaDataEntity {
     boolean hasViewModifiedUseStatsForParallelization();
 
     /**
+     * @return The VIEW_TTL duration associated with the view.
+     */
+    long getViewTTL();
+
+    /**
+     * @return The VIEW_TTL high water mark timestamp associated with the view.
+     */
+    long getViewTTLHighWaterMark();
+
+    /**
+     * @return If the view has overridden the TTL set at the parent view level.
+     */
+    boolean hasViewModifiedViewTTL();
+
+    /**
      * Class to help track encoded column qualifier counters per column family.
      */
     public class EncodedCQCounter {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 5c0e12c..ef79321 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -22,6 +22,8 @@ import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VIEW_TTL_HWM;
 
 import java.io.IOException;
 import java.sql.DriverManager;
@@ -65,6 +67,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
@@ -109,6 +112,7 @@ public class PTableImpl implements PTable {
     private static final Integer NO_SALTING = -1;
     private static final int VIEW_MODIFIED_UPDATE_CACHE_FREQUENCY_BIT_SET_POS = 0;
     private static final int VIEW_MODIFIED_USE_STATS_FOR_PARALLELIZATION_BIT_SET_POS = 1;
+    private static final int VIEW_MODIFIED_VIEW_TTL_BIT_SET_POS = 2;
 
     private IndexMaintainer indexMaintainer;
     private ImmutableBytesWritable indexMaintainersPtr;
@@ -167,6 +171,8 @@ public class PTableImpl implements PTable {
     private final QualifierEncodingScheme qualifierEncodingScheme;
     private final EncodedCQCounter encodedCQCounter;
     private final Boolean useStatsForParallelization;
+    private final long viewTTL;
+    private final long viewTTLHighWaterMark;
     private final BitSet viewModifiedPropSet;
 
     public static class Builder {
@@ -222,8 +228,11 @@ public class PTableImpl implements PTable {
         private QualifierEncodingScheme qualifierEncodingScheme;
         private EncodedCQCounter encodedCQCounter;
         private Boolean useStatsForParallelization;
+        private long viewTTL;
+        private long viewTTLHighWaterMark;
+
         // Used to denote which properties a view has explicitly modified
-        private BitSet viewModifiedPropSet = new BitSet(2);
+        private BitSet viewModifiedPropSet = new BitSet(3);
         // Optionally set columns for the builder, but not for the actual PTable
         private Collection<PColumn> columns;
 
@@ -510,6 +519,22 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        public Builder setViewTTL(long viewTTL) {
+            this.viewTTL = viewTTL;
+            return this;
+        }
+
+        public Builder setViewTTLHighWaterMark(long viewTTLHighWaterMark) {
+            this.viewTTLHighWaterMark = viewTTLHighWaterMark;
+            return this;
+        }
+
+        public Builder setViewModifiedViewTTL(boolean modified) {
+            this.viewModifiedPropSet.set(VIEW_MODIFIED_VIEW_TTL_BIT_SET_POS,
+                    modified);
+            return this;
+        }
+
         /**
          * Note: When set in the builder, we must call {@link Builder#initDerivedAttributes()}
          * before building the PTable in order to correctly populate other attributes of the PTable
@@ -780,6 +805,8 @@ public class PTableImpl implements PTable {
         this.qualifierEncodingScheme = builder.qualifierEncodingScheme;
         this.encodedCQCounter = builder.encodedCQCounter;
         this.useStatsForParallelization = builder.useStatsForParallelization;
+        this.viewTTL = builder.viewTTL;
+        this.viewTTLHighWaterMark = builder.viewTTLHighWaterMark;
         this.viewModifiedPropSet = builder.viewModifiedPropSet;
     }
 
@@ -847,7 +874,10 @@ public class PTableImpl implements PTable {
                         ImmutableList.<PName>of() : ImmutableList.copyOf(table.getPhysicalNames()))
                 .setViewModifiedUseStatsForParallelization(table
                         .hasViewModifiedUseStatsForParallelization())
-                .setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency());
+                .setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency())
+                .setViewModifiedViewTTL(table.hasViewModifiedViewTTL())
+                .setViewTTL(table.getViewTTL())
+                .setViewTTLHighWaterMark(table.getViewTTLHighWaterMark());
     }
 
     @Override
@@ -1664,15 +1694,28 @@ public class PTableImpl implements PTable {
         if (table.hasUseStatsForParallelization()) {
             useStatsForParallelization = table.getUseStatsForParallelization();
         }
+        long viewTTL = VIEW_TTL_NOT_DEFINED;
+        if (table.hasViewTTL()) {
+            viewTTL = table.getViewTTL();
+        }
+        long viewTTLHighWaterMark = MIN_VIEW_TTL_HWM;
+        if (table.hasViewTTLHighWaterMark()) {
+            viewTTLHighWaterMark = table.getViewTTLHighWaterMark();
+        }
+
         // for older clients just use the value of the properties that are set on the view
         boolean viewModifiedUpdateCacheFrequency = true;
         boolean viewModifiedUseStatsForParallelization = true;
+        boolean viewModifiedViewTTL = true;
         if (table.hasViewModifiedUpdateCacheFrequency()) {
             viewModifiedUpdateCacheFrequency = table.getViewModifiedUpdateCacheFrequency();
         }
         if (table.hasViewModifiedUseStatsForParallelization()) {
             viewModifiedUseStatsForParallelization = table.getViewModifiedUseStatsForParallelization();
         }
+        if (table.hasViewModifiedViewTTL()) {
+            viewModifiedViewTTL = table.getViewModifiedViewTTL();
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -1703,6 +1746,8 @@ public class PTableImpl implements PTable {
                     .setBaseColumnCount(baseColumnCount)
                     .setEncodedCQCounter(encodedColumnQualifierCounter)
                     .setUseStatsForParallelization(useStatsForParallelization)
+                    .setViewTTL(viewTTL)
+                    .setViewTTLHighWaterMark(viewTTLHighWaterMark)
                     .setExcludedColumns(ImmutableList.<PColumn>of())
                     .setTenantId(tenantId)
                     .setSchemaName(schemaName)
@@ -1719,6 +1764,7 @@ public class PTableImpl implements PTable {
                     .setColumns(columns)
                     .setViewModifiedUpdateCacheFrequency(viewModifiedUpdateCacheFrequency)
                     .setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
+                    .setViewModifiedViewTTL(viewModifiedViewTTL)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -1822,8 +1868,11 @@ public class PTableImpl implements PTable {
       if (table.useStatsForParallelization() != null) {
           builder.setUseStatsForParallelization(table.useStatsForParallelization());
       }
+      builder.setViewTTL(table.getViewTTL());
+      builder.setViewTTLHighWaterMark(table.getViewTTLHighWaterMark());
       builder.setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency());
       builder.setViewModifiedUseStatsForParallelization(table.hasViewModifiedUseStatsForParallelization());
+      builder.setViewModifiedViewTTL(table.hasViewModifiedViewTTL());
       return builder.build();
     }
 
@@ -1919,6 +1968,16 @@ public class PTableImpl implements PTable {
         return useStatsForParallelization;
     }
 
+    @Override
+    public long getViewTTL() {
+        return viewTTL;
+    }
+
+    @Override
+    public long getViewTTLHighWaterMark() {
+        return viewTTLHighWaterMark;
+    }
+
     @Override public boolean hasViewModifiedUpdateCacheFrequency() {
         return viewModifiedPropSet.get(VIEW_MODIFIED_UPDATE_CACHE_FREQUENCY_BIT_SET_POS);
     }
@@ -1927,6 +1986,10 @@ public class PTableImpl implements PTable {
         return viewModifiedPropSet.get(VIEW_MODIFIED_USE_STATS_FOR_PARALLELIZATION_BIT_SET_POS);
     }
 
+    @Override public boolean hasViewModifiedViewTTL() {
+        return viewModifiedPropSet.get(VIEW_MODIFIED_VIEW_TTL_BIT_SET_POS);
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 5e8324e..1ebba0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -24,11 +24,13 @@ import static org.apache.phoenix.exception.SQLExceptionCode.DEFAULT_COLUMN_FAMIL
 import static org.apache.phoenix.exception.SQLExceptionCode.SALT_ONLY_ON_CREATE_TABLE;
 import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TTL_NOT_DEFINED;
 
 import java.sql.SQLException;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -237,6 +239,37 @@ public enum TableProperty {
         public Object getPTableValue(PTable table) {
             return table.useStatsForParallelization();
         }
+    },
+
+    VIEW_TTL(PhoenixDatabaseMetaData.VIEW_TTL, true, true, true) {
+        /**
+         * VIEW_TTL can take any values ranging between 0 < VIEW_TTL <= HConstants.LATEST_TIMESTAMP.
+         * special values :-
+         * NONE or 0L => Not Defined.
+         * FOREVER => HConstants.LATEST_TIMESTAMP
+         *
+         * @param value
+         * @return
+         */
+        @Override
+        public Object getValue(Object value) {
+            if (value instanceof String) {
+                String strValue = (String) value;
+                if ("FOREVER".equalsIgnoreCase(strValue)) {
+                    return HConstants.LATEST_TIMESTAMP;
+                } else if ("NONE".equalsIgnoreCase(strValue)) {
+                    return VIEW_TTL_NOT_DEFINED;
+                }
+            } else {
+                return value == null ? null : ((Number) value).longValue();
+            }
+            return value;
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getViewTTL();
+        }
     }
     ;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
index d08a9d9..22e3a40 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ViewUtil.java
@@ -595,6 +595,7 @@ public class ViewUtil {
             PTable parent) {
         byte[] parentUpdateCacheFreqBytes = null;
         byte[] parentUseStatsForParallelizationBytes = null;
+        byte[] parentViewTTLBytes = null;
         if (parent != null) {
             parentUpdateCacheFreqBytes = new byte[PLong.INSTANCE.getByteSize()];
             PLong.INSTANCE.getCodec().encodeLong(parent.getUpdateCacheFrequency(),
@@ -603,6 +604,9 @@ public class ViewUtil {
                 parentUseStatsForParallelizationBytes =
                         PBoolean.INSTANCE.toBytes(parent.useStatsForParallelization());
             }
+            parentViewTTLBytes = new byte[PLong.INSTANCE.getByteSize()];
+            PLong.INSTANCE.getCodec().encodeLong(parent.getViewTTL(),
+                    parentViewTTLBytes, 0);
         }
         for (Mutation m: tableMetaData) {
             if (m instanceof Put) {
@@ -616,6 +620,11 @@ public class ViewUtil {
                         PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES,
                         parentUseStatsForParallelizationBytes,
                         MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
+                MetaDataUtil.conditionallyAddTagsToPutCells((Put)m,
+                        PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                        PhoenixDatabaseMetaData.VIEW_TTL_BYTES,
+                        parentViewTTLBytes,
+                        MetaDataEndpointImpl.VIEW_MODIFIED_PROPERTY_BYTES);
             }
 
         }
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 3e8981c..d77a583 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -106,6 +106,9 @@ message PTable {
   optional int32 viewIndexIdType = 39 [default = 5];
   optional bool viewModifiedUpdateCacheFrequency = 40;
   optional bool viewModifiedUseStatsForParallelization = 41;
+  optional int64 viewTTL = 42;
+  optional int64 viewTTLHighWaterMark = 43;
+  optional bool viewModifiedViewTTL = 44;
 }
 
 message EncodedCQCounter {