You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/01/13 00:47:55 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purging very recently deleted cells

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new 7673e47  PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purging very recently deleted cells
7673e47 is described below

commit 7673e47ad1c29319f3149f188f2b4b6e51b9b22d
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Dec 19 18:14:13 2019 -0800

    PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purging very recently deleted cells
---
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  | 350 +++++++++++++++++++++
 .../it/java/org/apache/phoenix/end2end/SCNIT.java  |  72 ++++-
 .../end2end/index/GlobalIndexCheckerIT.java        |  68 ++--
 .../hadoop/hbase/regionserver/ScanInfoUtil.java    |  86 ++++-
 .../org/apache/phoenix/compile/QueryCompiler.java  |  51 +++
 .../coprocessor/BaseScannerRegionObserver.java     | 121 ++++++-
 .../UngroupedAggregateRegionObserver.java          |   9 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   4 +
 .../apache/phoenix/index/GlobalIndexChecker.java   |   1 +
 .../org/apache/phoenix/util/TransactionUtil.java   |   2 +-
 .../java/org/apache/phoenix/util/TestUtil.java     | 136 +++++++-
 11 files changed, 840 insertions(+), 60 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
new file mode 100644
index 0000000..9215b44
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
+import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+
+@NeedsOwnMiniClusterTest
+public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
+    private static final int MAX_LOOKBACK_AGE = 10;
+    private static final int ROWS_POPULATED = 2;
+    private String tableDDLOptions;
+    private StringBuilder optionBuilder;
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void beforeTest(){
+        optionBuilder = new StringBuilder();
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @Test
+    public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexStem = generateUniqueName();
+            createTableAndIndexes(conn, dataTableName, indexStem);
+            //need to sleep long enough for the SCN to still find the syscat row for the table
+            Thread.sleep(MAX_LOOKBACK_AGE * 1000 + 1000);
+            Properties props = new Properties();
+            props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                Long.toString(EnvironmentEdgeManager.currentTime() - (MAX_LOOKBACK_AGE + 1) * 1000));
+            try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+                connscn.createStatement().executeQuery("select * from " + dataTableName);
+            } catch (SQLException se) {
+                SQLExceptionCode code =
+                    SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+                TestUtil.assertSqlExceptionCode(code, se);
+                return;
+            }
+        }
+        Assert.fail("We should have thrown an exception for the too-early SCN");
+    }
+
+    @Test(timeout=120000L)
+    public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexStem = generateUniqueName();
+            createTableAndIndexes(conn, dataTableName, indexStem);
+            String fullIndexName = indexStem + "1";
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(fullIndexName);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+            assertTableHasTtl(conn, indexTable, Integer.MAX_VALUE);
+            long beforeDeleteSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            Thread.sleep(1); //make sure we delete at a different ts
+            Statement stmt = conn.createStatement();
+            stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
+            Assert.assertEquals(1, stmt.getUpdateCount());
+            conn.commit();
+            //select stmt to get row we deleted
+            String sql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertExplainPlan(conn, sql, dataTableName, fullIndexName);
+            int rowsPlusDeleteMarker = ROWS_POPULATED;
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTime();
+            Thread.sleep(1);
+            majorCompact(indexTable, beforeFirstCompactSCN);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            //wait for the lookback time. After this compactions should purge the deleted row
+            Thread.sleep(MAX_LOOKBACK_AGE * 1000);
+            long beforeSecondCompactSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            String notDeletedRowSql =
+                String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
+            assertExplainPlan(conn, notDeletedRowSql, dataTableName, fullIndexName);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            conn.createStatement().execute("upsert into " + dataTableName +
+                " values ('c', 'cd', 'cde', 'cdef')");
+            conn.commit();
+            majorCompact(indexTable, beforeSecondCompactSCN);
+            majorCompact(dataTable, beforeSecondCompactSCN);
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            //deleted row should be gone, but not deleted row should still be there.
+            assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            //1 deleted row should be gone
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+        }
+    }
+
+    @Test(timeout=60000L)
+    public void testTTLAndMaxLookbackAge() throws Exception {
+        int ttl = 10;
+        optionBuilder.append("TTL=" + ttl);
+        tableDDLOptions = optionBuilder.toString();
+        Configuration conf = getUtility().getConfiguration();
+        //disable automatic memstore flushes
+        long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+            HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+        conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexStem = generateUniqueName();
+            createTableAndIndexes(conn, dataTableName, indexStem);
+            long afterFirstInsertSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            TableName dataTable = TableName.valueOf(dataTableName);
+            assertTableHasTtl(conn, dataTable, ttl);
+            String fullIndexName = indexStem + "1";
+            TableName indexTable = TableName.valueOf(fullIndexName);
+            assertTableHasTtl(conn, indexTable, ttl);
+
+            //first make sure we inserted correctly
+            String sql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertExplainPlan(conn, sql, dataTableName, fullIndexName);
+            assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+            int originalRowCount = 2;
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //force a flush
+            flush(indexTable);
+            //flush shouldn't have changed it
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //now wait the TTL
+            Thread.sleep((ttl +1) * 1000);
+            long afterTTLExpiresSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            assertExplainPlan(conn, sql, dataTableName, fullIndexName);
+            //make sure we can't see it after expiration from masking
+            assertRowExistsAtSCN(getUrl(), sql, afterTTLExpiresSCN, false);
+            //but it's still on disk
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            long beforeMajorCompactSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            majorCompact(indexTable, beforeMajorCompactSCN);
+            assertRawRowCount(conn, indexTable, 0);
+        } finally{
+            conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+        }
+    }
+
+    @Test
+    public void testRecentMaxVersionsNotCompactedAway() throws Exception {
+        int versions = 2;
+        optionBuilder.append("VERSIONS=" + versions);
+        tableDDLOptions = optionBuilder.toString();
+        String firstValue = "abc";
+        String secondValue = "def";
+        String thirdValue = "ghi";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexStem = generateUniqueName();
+            createTableAndIndexes(conn, dataTableName, indexStem, versions);
+            long afterInsertSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            //make sure table and index metadata is set up right for versions
+            TableName dataTable = TableName.valueOf(dataTableName);
+            assertTableHasVersions(conn, dataTable, versions);
+            String fullIndexName = indexStem + "1";
+            TableName indexTable = TableName.valueOf(fullIndexName);
+            assertTableHasVersions(conn, indexTable, versions);
+            //check query optimizer is doing what we expect
+            String dataTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+            String indexTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertExplainPlan(conn, indexTableSelectSql, dataTableName, fullIndexName);
+            //make sure the data was inserted correctly in the first place
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue);
+            assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue);
+            //force first update to get a distinct ts
+            Thread.sleep(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
+            long afterFirstUpdateSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            //force second update to get a distinct ts
+            Thread.sleep(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
+            long afterSecondUpdateSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            //check to make sure we can see all three versions at the appropriate times
+            String[] allValues = {firstValue, secondValue, thirdValue};
+            long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN};
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+
+            flush(dataTable);
+            flush(indexTable);
+            //after flush, check to make sure we can see all three versions at the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+            majorCompact(dataTable, afterSecondUpdateSCN);
+            majorCompact(indexTable, afterSecondUpdateSCN);
+            //after major compaction, check to make sure we can see all three versions
+            // at the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+            Thread.sleep(MAX_LOOKBACK_AGE * 1000);
+            long afterLookbackAgeSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
+            majorCompact(dataTable, afterLookbackAgeSCN);
+            majorCompact(indexTable, afterLookbackAgeSCN);
+            //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
+            //2 versions of empty column, 2 versions of val2,
+            // 2 versions of val3 (since we write whole rows to index) = 6
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
+            //empty column + 1 version each of val1,2 and 3 = 4
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
+            //1 version of empty column, 1 version of val2, 1 version of val3 = 3
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3);
+        }
+    }
+
+    private void flush(TableName table) throws IOException {
+        Admin admin = getUtility().getHBaseAdmin();
+        admin.flush(table);
+    }
+
+    private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception {
+        Admin admin = getUtility().getHBaseAdmin();
+        admin.majorCompact(table);
+        long lastCompactionTimestamp;
+        AdminProtos.GetRegionInfoResponse.CompactionState state = null;
+        while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN
+            || (state = admin.getCompactionState(table)).
+            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Last compaction time:" + lastCompactionTimestamp);
+                LOG.trace("CompactionState: " + state);
+            }
+            Thread.sleep(100);
+        }
+    }
+
+    public static void assertExplainPlan(Connection conn, String selectSql,
+                                         String dataTableFullName, String indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+        IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
+    }
+
+    private void assertMultiVersionLookbacks(String dataTableSelectSql,
+                                             String[] values, long[] scns)
+        throws Exception {
+        //make sure we can still look back after updating
+        for (int k = 0; k < values.length; k++){
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]);
+        }
+    }
+
+    private void updateColumn(Connection conn, String dataTableName,
+                              String idColumn, String id, String valueColumn, String value)
+        throws SQLException {
+        String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
+            dataTableName, idColumn, valueColumn, id, value);
+        conn.createStatement().execute(upsertSql);
+        conn.commit();
+    }
+
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName) throws Exception {
+        createTableAndIndexes(conn, dataTableName, indexTableName, 1);
+    }
+
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName, int indexVersions) throws Exception {
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
+            dataTableName + " (val1) include (val2, val3)" +
+            " VERSIONS=" + indexVersions);
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
+            dataTableName + " (val2) include (val1, val3)" +
+            " VERSIONS=" + indexVersions);
+        conn.commit();
+    }
+
+    private void populateTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String createSql = "create table " + tableName +
+            " (id varchar(10) not null primary key, val1 varchar(10), " +
+            "val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
+        conn.createStatement().execute(createSql);
+        conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+        conn.commit();
+        conn.close();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
index 6c45b06..d79752e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
@@ -24,11 +24,17 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class SCNIT extends ParallelStatsDisabledIT {
@@ -67,7 +73,7 @@ public class SCNIT extends ParallelStatsDisabledIT {
 			rs.close();
 		}
 		props.clear();
-		props.setProperty("CurrentSCN", Long.toString(timeAfterDelete));
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete));
 		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
 			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
 			assertTrue(rs.next());
@@ -82,28 +88,68 @@ public class SCNIT extends ParallelStatsDisabledIT {
 
 	@Test
 	public void testSCNWithTTL() throws Exception {
+		int ttl = 2;
+		String fullTableName = createTableWithTTL(ttl);
+		//sleep for one second longer than ttl
+		Thread.sleep(ttl * 1000 + 1000);
+		Properties props = new Properties();
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+				Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
+		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
+			assertFalse(rs.next());
+			rs.close();
+		}
+	}
+
+	@Test
+	public void testTooLowSCNWithTTL() throws Exception {
+		//if scn is for an older time than a table's ttl, it should throw a SQLException
+		int ttl = 2;
+		String fullTableName = createTableWithTTL(ttl);
+		int sleepTime = (ttl + 1)* 1000;
+		//need to sleep long enough for the SCN to still find the syscat row for the table
+		Thread.sleep(sleepTime);
+		Properties props = new Properties();
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+			Long.toString(EnvironmentEdgeManager.currentTime() - sleepTime));
+		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+			connscn.createStatement().
+				executeQuery(String.format("select * from %s", fullTableName));
+		} catch (SQLException se){
+			SQLExceptionCode code = SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL;
+			assertSqlExceptionCode(code, se);
+			return;
+		}
+		Assert.fail("We should have thrown an exception for the too-early SCN");
+	}
+
+	private void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+		assertEquals(code.getErrorCode(), se.getErrorCode());
+		assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+		assertEquals(code.getSQLState(), se.getSQLState());
+	}
+
+	private String createTableWithTTL(int ttl) throws SQLException, InterruptedException {
 		String schemaName = generateUniqueName();
 		String tableName = generateUniqueName();
+		StringBuilder optionsBuilder = new StringBuilder();
+		if (ttl > 0){
+			optionsBuilder.append("TTL=");
+			optionsBuilder.append(ttl);
+		}
+		String ddlOptions = optionsBuilder.toString();
 		String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
 		try (Connection conn = DriverManager.getConnection(getUrl())) {
 			conn.createStatement()
-					.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) TTL=2");
+					.execute(String.format("CREATE TABLE %s" +
+						"(k VARCHAR PRIMARY KEY, v VARCHAR) %s", fullTableName, ddlOptions));
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
 			conn.commit();
-			// TTL is 2 sec
-			Thread.sleep(3000);
-		}
-
-		Properties props = new Properties();
-		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
-				Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
-		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
-			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
-			assertFalse(rs.next());
-			rs.close();
 		}
+		return fullTableName;
 	}
 
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 5861323..a825826 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,15 +54,12 @@ import com.google.common.collect.Lists;
 public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class);
     private final boolean async;
-    private final String tableDDLOptions;
-
+    private String tableDDLOptions;
+    private StringBuilder optionBuilder;
+    private final boolean encoded;
     public GlobalIndexCheckerIT(boolean async, boolean encoded) {
         this.async = async;
-        StringBuilder optionBuilder = new StringBuilder();
-        if (!encoded) {
-            optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
-        }
-        this.tableDDLOptions = optionBuilder.toString();
+        this.encoded = encoded;
     }
 
     @BeforeClass
@@ -71,6 +69,15 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
+    @Before
+    public void beforeTest(){
+        optionBuilder = new StringBuilder();
+        if (!encoded) {
+            optionBuilder.append(" COLUMN_ENCODED_BYTES=0");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
     @Parameters(
             name = "async={0},encoded={1}")
     public static synchronized Collection<Object[]> data() {
@@ -305,17 +312,8 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     public void testOnePhaseOverwrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
-            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
             String indexTableName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
-                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
-                    dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
-            if (async) {
-                // run the index MR job.
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
-            }
+            createTableAndIndexes(conn, dataTableName, indexTableName);
             // Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index
             // update phase) and check that this does not impact the correctness (one overwrite)
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
@@ -385,21 +383,35 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName) throws Exception {
+        createTableAndIndexes(conn, dataTableName, indexTableName, 1);
+    }
+
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName, int indexVersions) throws Exception {
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") +
+            " VERSIONS=" + indexVersions);
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
+                dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+
+            " VERSIONS=" + indexVersions);
+        conn.commit();
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
+        }
+    }
+
     @Test
     public void testFailDataTableAndPostIndexRowUpdate() throws Exception {
-        String dataTableName = generateUniqueName();
-        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+
         try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
             String indexName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
-                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
-            conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
-                    dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
-            if (async) {
-                // run the index MR job.
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
-            }
+            createTableAndIndexes(conn, dataTableName, indexName);
             // Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase)
             // and check that this does not impact the correctness
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index e0d62a2..6854a75 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -20,28 +20,108 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.NavigableSet;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.client.Scan;
 
 public class ScanInfoUtil {
+    public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+        "phoenix.max.lookback.age.seconds";
+    public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
     private ScanInfoUtil() {
     }
-    
+
     public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
         return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
     }
-    
+
     public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
         return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), scanInfo.getMinVersions(),
                     scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
                     scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
     }
 
-    public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,long readPt) throws IOException {
+    public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+                                                  final NavigableSet<byte[]> columns,long readPt)
+        throws IOException {
         if(!scan.isReversed()) {
             return new StoreScanner(store, scanInfo, scan, columns,readPt);
         } else {
             return new ReversedStoreScanner(store, scanInfo, scan, columns,readPt);
         }
     }
+
+    public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
+                                                   ScanInfo scanInfo) {
+        long ttl = scanInfo.getTtl();
+        long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
+        if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
+            if (ttl == Long.MAX_VALUE
+                && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
+                // If user configured default TTL(FOREVER) and keep deleted cells to false or
+                // TTL then to remove unwanted delete markers we should change ttl to max lookback age
+                ttl = maxLookbackTtl;
+            } else {
+                //if there is a TTL, use TTL instead of max lookback age.
+                // Max lookback age should be more recent or equal to TTL
+                ttl = Math.max(ttl, maxLookbackTtl);
+            }
+        }
+
+        return ttl;
+    }
+
+    /*
+     * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
+     * let delete markers age once lookback age is done.
+     */
+    private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType scanType) {
+        //if we're doing a minor compaction or flush, always set keep deleted cells
+        //to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL,
+        //where the value of the ttl might be overriden to the max lookback age elsewhere
+        return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE
+            || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
+            KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
+    }
+
+    /*
+     * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases).
+     * Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want
+     * Math.max(maxVersions, minVersions, 1)
+     */
+    private static int getMinVersions(ScanInfo oldScanInfo, final Store store) {
+        return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions()
+            : Math.max(Math.max(store.getFamily().getMinVersions(),
+            store.getFamily().getMaxVersions()),1);
+    }
+
+    public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf,
+                                                               ScanInfo oldScanInfo,
+                                                         final Store store,
+                                                         ScanType type) {
+        long ttl = getTimeToLiveForCompactions(store.getFamily(), oldScanInfo);
+        KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store, type);
+        int minVersions = getMinVersions(oldScanInfo, store);
+        return new ScanInfo(conf,store.getFamily().getName(), minVersions,
+            Integer.MAX_VALUE, ttl, keepDeletedCells,
+            oldScanInfo.getTimeToPurgeDeletes(),
+            oldScanInfo.getComparator());
+    }
+
+    private static long getMaxLookback(Configuration conf){
+        //config param is in seconds, switch to millis
+        return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+        return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
+    }
+
+    private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+        return maxLookbackTime > 0L;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index e149c77..4722bd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -28,13 +28,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
 import org.apache.phoenix.compile.JoinCompiler.JoinTable;
 import org.apache.phoenix.compile.JoinCompiler.Table;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
@@ -68,6 +73,7 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -76,6 +82,7 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -150,6 +157,7 @@ public class QueryCompiler {
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
     public QueryPlan compile() throws SQLException{
+        verifySCN();
         QueryPlan plan;
         if (select.isUnion()) {
             plan = compileUnionAll(select);
@@ -159,6 +167,49 @@ public class QueryCompiler {
         return plan;
     }
 
+    private void verifySCN() throws SQLException {
+        PhoenixConnection conn = statement.getConnection();
+        Long scn = conn.getSCN();
+        if (scn == null) {
+            return;
+        }
+        List<TableRef> scnTooOldTableRefs = new ArrayList<TableRef>();
+        ColumnResolver resolver =
+            FromCompiler.getResolverForQuery(select, conn);
+        List<TableRef> involvedTables = resolver.getTables();
+        int maxLookBackAge = conn.getQueryServices().
+            getConfiguration().getInt(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE);
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        if (maxLookBackAge > 0 && now - maxLookBackAge * 1000L > scn){
+            throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
+                .build().buildException();
+        }
+        for (TableRef tableRef : involvedTables) {
+            byte[] tableQualifier = tableRef.getTable().getPhysicalName().getBytes();
+            //we can have a tableRef with an empty table, such as with sequences
+            if (tableQualifier.length > 0) {
+                HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier);
+                HColumnDescriptor cd = td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+                now = EnvironmentEdgeManager.currentTimeMillis();
+                if (now - cd.getTimeToLive() * 1000L > scn) {
+                    scnTooOldTableRefs.add(tableRef);
+                }
+            }
+        }
+        if (scnTooOldTableRefs.size() > 0) {
+            TableRef tableRef = scnTooOldTableRefs.get(0);
+            throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL)
+                .setSchemaName(tableRef.getTable().getSchemaName().getString())
+                .setTableName(tableRef.getTable().getTableName().getString())
+                .build()
+                .buildException();
+        }
+
+    }
+
     public QueryPlan compileUnionAll(SelectStatement select) throws SQLException { 
         List<SelectStatement> unionAllSelects = select.getSelects();
         List<QueryPlan> plans = new ArrayList<QueryPlan>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 585c17f..71e7546 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,13 +18,18 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -32,14 +37,17 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -56,6 +64,7 @@ import org.apache.phoenix.util.TransactionUtil;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
+    private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class);
 
     public static final String AGGREGATORS = "_Aggs";
     public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
@@ -121,7 +130,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     // In case of Index Write failure, we need to determine that Index mutation
     // is part of normal client write or Index Rebuilder. # PHOENIX-5080
     public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3);
-    
     public enum ReplayWrite {
         TABLE_AND_INDEX,
         INDEX_ONLY,
@@ -374,20 +382,107 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
+
     @Override
     public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
-        final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
-        final KeyValueScanner s) throws IOException {
+                                               final Store store, final Scan scan,
+                                               final NavigableSet<byte[]> targetCols,
+                                               final KeyValueScanner s) throws IOException {
+        if (storeFileScanDoesntNeedAlteration(store, scan)) {
+            return s;
+        }
 
-      if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
-        return s;
-      }
-      
-      if (s!=null) {
-          s.close();
-      }
-      ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
-      return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
-          c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+        if (s != null) {
+            s.close();
+        }
+        ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+        return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
+            c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+    }
+
+    private boolean storeFileScanDoesntNeedAlteration(Store store, Scan scan) {
+        boolean isRaw = scan.isRaw();
+        //true if keep deleted cells is either TRUE or TTL
+        boolean keepDeletedCells = ScanInfoUtil.isKeepDeletedCells(store.getScanInfo());
+        boolean timeRangeIsLatest = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
+        boolean timestampIsTransactional =
+            TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax());
+        return isRaw
+            || keepDeletedCells
+            || timeRangeIsLatest
+            || timestampIsTransactional;
+    }
+
+    @Override
+    public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                               final Store store,
+                                               final KeyValueScanner memstoreScanner,
+                                               final InternalScanner s)
+        throws IOException {
+
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+            return s;
+        }
+
+        //close last scanner object before creating a new one
+        if(s != null) {
+            s.close();
+        }
+
+        // Called during flushing the memstore to disk.
+        // Need to retain all the delete markers & all the versions
+        Scan scan = new Scan();
+        scan.setMaxVersions(Integer.MAX_VALUE);
+        ScanInfo oldScanInfo = store.getScanInfo();
+
+        Configuration conf = c.getEnvironment().getConfiguration();
+        //minor compactions and flushes both use "compact retain deletes"
+        ScanType scanType = ScanType.COMPACT_RETAIN_DELETES;
+        ScanInfo scanInfo =
+            ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo, store, scanType);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating the store scanner with :" + scanInfo + ", " +
+                "scan object:" + scan + " for table " + store.getTableName().getNameAsString() +
+                " and region " + store.getRegionInfo().getRegionNameAsString() +
+                " and cf " + store.getColumnFamilyName());
+        }
+        return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+            scanType, store.getSmallestReadPoint(),
+            HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(
+        final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+        List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+        final InternalScanner s) throws IOException {
+
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+            return s;
+        }
+        //close last scanner object before creating a new one
+        if(s != null) {
+            s.close();
+        }
+        Scan scan = new Scan();
+        scan.setMaxVersions(Integer.MAX_VALUE);
+        ScanInfo oldScanInfo = store.getScanInfo();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Compaction triggering for table:" +
+                store.getRegionInfo().getTable().toString()
+                + " with scanType " + scanType  + " for table " +
+                store.getTableName().getNameAsString() + " and region " +
+                store.getRegionInfo().getRegionNameAsString() +
+                " and cf " + store.getColumnFamilyName());
+        }
+
+        Configuration conf = c.getEnvironment().getConfiguration();
+        ScanInfo scanInfo =
+            ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo,
+                store, scanType);
+        return new StoreScanner(store, scanInfo, scan, scanners, scanType,
+            store.getSmallestReadPoint(),
+            earliestPutTs);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 23091a8..347dd01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
@@ -1546,9 +1547,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // This will lead to failure of cross cluster RPC if the effective user is not
         // the login user. Switch to the login user context to ensure we have the expected
         // security context.
+
         final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
         // since we will make a call to syscat, do nothing if we are compacting syscat itself
-        if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
+        // also, if max lookback age is already configured, we're already taken care of elsewhere
+        // unless the index stays disabled beyond the max lookback age, in which case you probably
+        // want to rebuild anyway
+        if (request.isMajor() &&
+            !ScanInfoUtil.isMaxLookbackTimeEnabled(compactionConfig) &&
+            !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
             return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
                 @Override
                 public InternalScanner run() throws Exception {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2f072a3..2bd9ff3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -191,6 +191,10 @@ public enum SQLExceptionCode {
      UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
      ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"),
      PARENT_TABLE_NOT_FOUND(536, "42913", "Can't drop the index because the parent table in the DROP statement is incorrect."),
+     CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL(537, "42914",
+         "Cannot use SCN to look further back in the past beyond the TTL"),
+    CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
+        "Cannot use SCN to look further back in the past beyond the configured max lookback age"),
 
      /**
      * HBase and Phoenix specific implementation defined sub-classes.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index c7f79ae..0e502ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -534,4 +534,5 @@ public class GlobalIndexChecker extends BaseRegionObserver {
     public void stop(CoprocessorEnvironment e) throws IOException {
         this.hTableFactory.shutdown();
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index dee02d1..6a4a8f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -51,7 +51,7 @@ public class TransactionUtil {
     }
     
     public static boolean isTransactionalTimestamp(long ts) {
-        return ts >= MAX_NON_TX_TIMESTAMP;
+        return ts >= MAX_NON_TX_TIMESTAMP && ts != HConstants.LATEST_TIMESTAMP;
     }
     
     public static boolean isDelete(Cell cell) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index d22971d..9fa6422 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -49,14 +49,20 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -87,6 +93,7 @@ import org.apache.phoenix.compile.JoinCompiler.JoinTable;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ByteBasedLikeExpression;
@@ -137,6 +144,7 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -866,6 +874,13 @@ public class TestUtil {
         conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps)));
     }
 
+    public static void dumpTable(Connection conn, TableName tableName)
+        throws SQLException, IOException{
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableInterface table = cqs.getTable(tableName.getName());
+        dumpTable(table);
+    }
+
     public static void dumpTable(HTableInterface table) throws IOException {
         System.out.println("************ dumping " + table + " **************");
         Scan s = new Scan();
@@ -878,7 +893,9 @@ public class TestUtil {
                 Cell current = null;
                 while (cellScanner.advance()) {
                     current = cellScanner.current();
-                    System.out.println(current);
+                    System.out.println(current + "column= " +
+                        Bytes.toString(CellUtil.cloneQualifier(current)) +
+                        " val=" + Bytes.toString(CellUtil.cloneValue(current)));
                 }
             }
         }
@@ -908,6 +925,46 @@ public class TestUtil {
         return rows;
     }
 
+    public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
+        Scan s = new Scan();
+        s.setRaw(isRaw);;
+        s.setMaxVersions();
+
+        CellCount cellCount = new CellCount();
+        try (ResultScanner scanner = table.getScanner(s)) {
+            Result result = null;
+            while ((result = scanner.next()) != null) {
+                CellScanner cellScanner = result.cellScanner();
+                Cell current = null;
+                while (cellScanner.advance()) {
+                    current = cellScanner.current();
+                    cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current)));
+                }
+            }
+        }
+        return cellCount;
+    }
+
+    static class CellCount {
+        private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();
+
+        void addCell(String key){
+            if (rowCountMap.containsKey(key)){
+                rowCountMap.put(key, rowCountMap.get(key) +1);
+            } else {
+                rowCountMap.put(key, 1);
+            }
+        }
+
+        int getCellCount(String key){
+            if (rowCountMap.containsKey(key)){
+                return rowCountMap.get(key);
+            } else {
+                return 0;
+            }
+        }
+    }
+
     public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
         try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { 
             System.out.println("************ dumping index status for " + indexName + " **************");
@@ -1216,4 +1273,81 @@ public class TestUtil {
     public static void assertSelectStatement(FilterableStatement selectStatement , String sql) {
         assertTrue(selectStatement.toString().trim().equals(sql));
     }
+
+    public static void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+        assertEquals(code.getErrorCode(), se.getErrorCode());
+        assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+        assertEquals(code.getSQLState(), se.getSQLState());
+    }
+
+    public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl)
+        throws SQLException, IOException {
+        HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+        Assert.assertEquals(ttl, cd.getTimeToLive());
+    }
+
+    public static void assertTableHasVersions(Connection conn, TableName tableName, int versions)
+        throws SQLException, IOException {
+        HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+        Assert.assertEquals(versions, cd.getMaxVersions());
+    }
+
+    public static HColumnDescriptor getColumnDescriptor(Connection conn, TableName tableName)
+        throws SQLException, IOException {
+        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        HTableDescriptor td = admin.getTableDescriptor(tableName);
+        return td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+    }
+
+    public static void assertRawRowCount(Connection conn, TableName table, int expectedRowCount)
+        throws SQLException, IOException {
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        int count = TestUtil.getRawRowCount(cqs.getTable(table.getName()));
+        assertEquals(expectedRowCount, count);
+    }
+
+    public static void assertRawCellCount(Connection conn, TableName tableName,
+                                          byte[] row, int expectedCellCount)
+        throws SQLException, IOException{
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Table table = cqs.getTable(tableName.getName());
+        CellCount cellCount = getCellCount(table, true);
+        int count = cellCount.getCellCount(Bytes.toString(row));
+        assertEquals(expectedCellCount, count);
+    }
+
+    public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist)
+        throws SQLException {
+        boolean rowExists = false;
+        Properties props = new Properties();
+        ResultSet rs;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        try (Connection conn = DriverManager.getConnection(url, props)){
+            rs = conn.createStatement().executeQuery(sql);
+            rowExists = rs.next();
+            if (shouldExist){
+                Assert.assertTrue("Row was not found at time " + scn +
+                        " when it should have been",
+                    rowExists);
+            } else {
+                Assert.assertFalse("Row was found at time " + scn +
+                    " when it should not have been", rowExists);
+            }
+        }
+
+    }
+
+    public static void assertRowHasExpectedValueAtSCN(String url, String sql,
+                                                      long scn, String value) throws SQLException {
+        Properties props = new Properties();
+        ResultSet rs;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        try (Connection conn = DriverManager.getConnection(url, props)){
+            rs = conn.createStatement().executeQuery(sql);
+            Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next());
+            Assert.assertEquals(value, rs.getString(1));
+        }
+
+    }
+
 }