You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/01/16 00:15:49 UTC

[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5645 - BaseScannerRegionObserver should prevent compaction from purging very recently deleted cells

This is an automated email from the ASF dual-hosted git repository.

gjacoby pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
     new fcde213  PHOENIX-5645 - BaseScannerRegionObserver should prevent compaction from purging very recently deleted cells
fcde213 is described below

commit fcde213fc66565c965c2f5ebc461e948c15cf9b7
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Mon Jan 13 11:04:05 2020 -0800

    PHOENIX-5645 - BaseScannerRegionObserver should prevent compaction from purging very recently deleted cells
---
 .../org/apache/phoenix/end2end/MaxLookbackIT.java  | 418 +++++++++++++++++++++
 .../it/java/org/apache/phoenix/end2end/SCNIT.java  |  72 +++-
 .../end2end/index/GlobalIndexCheckerIT.java        |  68 ++--
 .../hadoop/hbase/regionserver/ScanInfoUtil.java    |  86 ++++-
 .../org/apache/phoenix/compile/QueryCompiler.java  |  54 +++
 .../coprocessor/BaseScannerRegionObserver.java     | 121 +++++-
 .../UngroupedAggregateRegionObserver.java          |   9 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   7 +-
 .../apache/phoenix/index/GlobalIndexChecker.java   |   1 +
 .../org/apache/phoenix/util/TransactionUtil.java   |   2 +-
 .../java/org/apache/phoenix/util/TestUtil.java     | 136 ++++++-
 11 files changed, 913 insertions(+), 61 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
new file mode 100644
index 0000000..e27796c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
+import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+
+@NeedsOwnMiniClusterTest
+public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
+    private static final int MAX_LOOKBACK_AGE = 15;
+    private static final int ROWS_POPULATED = 2;
+    public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
+    private String tableDDLOptions;
+    private StringBuilder optionBuilder;
+    ManualEnvironmentEdge injectEdge;
+    private int ttl;
+
+    private class ManualEnvironmentEdge extends EnvironmentEdge {
+        // Sometimes 0 ts might have a special value, so lets start with 1
+        protected long value = 1L;
+
+        public void setValue(long newValue) {
+            value = newValue;
+        }
+
+        public void incrementValue(long addedValue) {
+            value += addedValue;
+        }
+
+        @Override
+        public long currentTime() {
+            return this.value;
+        }
+    }
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void beforeTest(){
+        EnvironmentEdgeManager.reset();
+        optionBuilder = new StringBuilder();
+        this.tableDDLOptions = optionBuilder.toString();
+        ttl = 0;
+        injectEdge = new ManualEnvironmentEdge();
+        injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+    }
+
+    @Test
+    public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+        String dataTableName = generateUniqueName();
+        createTable(dataTableName);
+        //increase long enough to make sure we can find the syscat row for the table
+        injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+        populateTable(dataTableName);
+        long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+        injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+            Long.toString(populateTime));
+        try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+            connscn.createStatement().executeQuery("select * from " + dataTableName);
+        } catch (SQLException se) {
+            SQLExceptionCode code =
+                SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+            TestUtil.assertSqlExceptionCode(code, se);
+            return;
+        }
+        Assert.fail("We should have thrown an exception for the too-early SCN");
+    }
+
+    @Test(timeout=120000L)
+    public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            TableName dataTable = TableName.valueOf(dataTableName);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            TableName indexTable = TableName.valueOf(indexName);
+            //make sure we're after the inserts have been committed
+            injectEdge.incrementValue(1);
+            long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(10); //make sure we delete at a different ts
+            Statement stmt = conn.createStatement();
+            stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
+            Assert.assertEquals(1, stmt.getUpdateCount());
+            conn.commit();
+            //select stmt to get row we deleted
+            String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName);
+            String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
+            int rowsPlusDeleteMarker = ROWS_POPULATED;
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            flush(dataTable);
+            flush(indexTable);
+            assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+            long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(1); //new ts for major compaction
+            majorCompact(dataTable, beforeFirstCompactSCN);
+            majorCompact(indexTable, beforeFirstCompactSCN);
+            assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+            assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+            //wait for the lookback time. After this compactions should purge the deleted row
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+            String notDeletedRowSql =
+                String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
+            String notDeletedIndexRowSql =
+                String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+            conn.createStatement().execute("upsert into " + dataTableName +
+                " values ('c', 'cd', 'cde', 'cdef')");
+            conn.commit();
+            majorCompact(dataTable, beforeSecondCompactSCN);
+            majorCompact(indexTable, beforeSecondCompactSCN);
+            //should still be ROWS_POPULATED because we added one and deleted one
+            assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+            assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+            //deleted row should be gone, but not deleted row should still be there.
+            assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false);
+            assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+            assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+
+        }
+    }
+
+    @Test(timeout=60000L)
+    public void testTTLAndMaxLookbackAge() throws Exception {
+        ttl = 20;
+        optionBuilder.append("TTL=" + ttl);
+        tableDDLOptions = optionBuilder.toString();
+        Configuration conf = getUtility().getConfiguration();
+        //disable automatic memstore flushes
+        long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+            HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+        conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            //increment to make sure we don't "look back" past table creation
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, 1);
+            injectEdge.incrementValue(1);
+            long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasTtl(conn, dataTable, ttl);
+            assertTableHasTtl(conn, indexTable, ttl);
+            //first make sure we inserted correctly
+            String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+            String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+            int originalRowCount = 2;
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //force a flush
+            flush(dataTable);
+            flush(indexTable);
+            //flush shouldn't have changed it
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            assertExplainPlan(conn, indexSql, dataTableName, indexName);
+            long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+                (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            //make sure it's still on disk
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            injectEdge.incrementValue(1); //get a new timestamp for compaction
+            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            //nothing should have been purged by this major compaction
+            assertRawRowCount(conn, dataTable, originalRowCount);
+            assertRawRowCount(conn, indexTable, originalRowCount);
+            //now wait the TTL
+            timeToAdvance = (ttl * 1000) -
+                (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+            if (timeToAdvance > 0) {
+                injectEdge.incrementValue(timeToAdvance);
+            }
+            //make sure that we can compact away the now-expired rows
+            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            //note that before HBase 1.4, we don't have HBASE-17956
+            // and this will always return 0 whether it's still on-disk or not
+            assertRawRowCount(conn, dataTable, 0);
+            assertRawRowCount(conn, indexTable, 0);
+        } finally{
+            conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testRecentMaxVersionsNotCompactedAway() throws Exception {
+        int versions = 2;
+        optionBuilder.append("VERSIONS=" + versions);
+        tableDDLOptions = optionBuilder.toString();
+        String firstValue = "abc";
+        String secondValue = "def";
+        String thirdValue = "ghi";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexName = generateUniqueName();
+            createTable(dataTableName);
+            //increment to make sure we don't "look back" past table creation
+            injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+            populateTable(dataTableName);
+            createIndex(dataTableName, indexName, versions);
+            injectEdge.incrementValue(1); //increment by 1 so we can see our write
+            long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+            //make sure table and index metadata is set up right for versions
+            TableName dataTable = TableName.valueOf(dataTableName);
+            TableName indexTable = TableName.valueOf(indexName);
+            assertTableHasVersions(conn, dataTable, versions);
+            assertTableHasVersions(conn, indexTable, versions);
+            //check query optimizer is doing what we expect
+            String dataTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+            String indexTableSelectSql =
+                String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+            assertExplainPlan(conn, indexTableSelectSql, dataTableName, indexName);
+            //make sure the data was inserted correctly in the first place
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue);
+            assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue);
+            //force first update to get a distinct ts
+            injectEdge.incrementValue(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
+            injectEdge.incrementValue(1); //now make update visible
+            long afterFirstUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
+            //force second update to get a distinct ts
+            injectEdge.incrementValue(1);
+            updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
+            injectEdge.incrementValue(1);
+            long afterSecondUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
+            injectEdge.incrementValue(1);
+            //check to make sure we can see all three versions at the appropriate times
+            String[] allValues = {firstValue, secondValue, thirdValue};
+            long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN};
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+            flush(dataTable);
+            flush(indexTable);
+            //after flush, check to make sure we can see all three versions at the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+            majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+            majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+            //after major compaction, check to make sure we can see all three versions
+            // at the appropriate times
+            assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+            assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis();
+            majorCompact(dataTable, afterLookbackAgeSCN);
+            majorCompact(indexTable, afterLookbackAgeSCN);
+            //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
+            //2 versions of empty column, 2 versions of val2,
+            // 2 versions of val3 (since we write whole rows to index) = 6
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
+            //empty column + 1 version each of val1,2 and 3 = 4
+            assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
+            //1 version of empty column, 1 version of val2, 1 version of val3 = 3
+            assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3);
+        }
+    }
+
+    private void flush(TableName table) throws IOException {
+        Admin admin = getUtility().getHBaseAdmin();
+        admin.flush(table);
+    }
+
+    private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception {
+        Admin admin = getUtility().getHBaseAdmin();
+        admin.majorCompact(table);
+        long lastCompactionTimestamp;
+        AdminProtos.GetRegionInfoResponse.CompactionState state = null;
+        while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN
+            || (state = admin.getCompactionState(table)).
+            equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Last compaction time:" + lastCompactionTimestamp);
+                LOG.trace("CompactionState: " + state);
+            }
+            Thread.sleep(100);
+        }
+    }
+
+    private void assertMultiVersionLookbacks(String dataTableSelectSql,
+                                             String[] values, long[] scns)
+        throws Exception {
+        //make sure we can still look back after updating
+        for (int k = 0; k < values.length; k++){
+            assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]);
+        }
+    }
+
+    private void updateColumn(Connection conn, String dataTableName,
+                              String idColumn, String id, String valueColumn, String value)
+        throws SQLException {
+        String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
+            dataTableName, idColumn, valueColumn, id, value);
+        conn.createStatement().execute(upsertSql);
+        conn.commit();
+    }
+
+    private void createTable(String tableName) throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            String createSql = "create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), " +
+                "val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
+            conn.createStatement().execute(createSql);
+            conn.commit();
+        }
+    }
+    private void populateTable(String tableName) throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+            conn.commit();
+            conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+            conn.commit();
+        }
+    }
+
+    private void createIndex(String dataTableName, String indexTableName, int indexVersions)
+        throws SQLException {
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            //4.14 and below need to set TTL manually on indexes.
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                dataTableName + " (val1) include (val2, val3)" +
+                " VERSIONS=" + indexVersions +
+                (ttl > 0 ? ", TTL=" + ttl : ""));
+            conn.commit();
+        }
+    }
+
+    public static void assertExplainPlan(Connection conn, String selectSql,
+                                         String dataTableFullName, String indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+        IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
+    }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
index 6c45b06..d79752e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
@@ -24,11 +24,17 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class SCNIT extends ParallelStatsDisabledIT {
@@ -67,7 +73,7 @@ public class SCNIT extends ParallelStatsDisabledIT {
 			rs.close();
 		}
 		props.clear();
-		props.setProperty("CurrentSCN", Long.toString(timeAfterDelete));
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete));
 		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
 			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
 			assertTrue(rs.next());
@@ -82,28 +88,68 @@ public class SCNIT extends ParallelStatsDisabledIT {
 
 	@Test
 	public void testSCNWithTTL() throws Exception {
+		int ttl = 2;
+		String fullTableName = createTableWithTTL(ttl);
+		//sleep for one second longer than ttl
+		Thread.sleep(ttl * 1000 + 1000);
+		Properties props = new Properties();
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+				Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
+		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
+			assertFalse(rs.next());
+			rs.close();
+		}
+	}
+
+	@Test
+	public void testTooLowSCNWithTTL() throws Exception {
+		//if scn is for an older time than a table's ttl, it should throw a SQLException
+		int ttl = 2;
+		String fullTableName = createTableWithTTL(ttl);
+		int sleepTime = (ttl + 1)* 1000;
+		//need to sleep long enough for the SCN to still find the syscat row for the table
+		Thread.sleep(sleepTime);
+		Properties props = new Properties();
+		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+			Long.toString(EnvironmentEdgeManager.currentTime() - sleepTime));
+		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+			connscn.createStatement().
+				executeQuery(String.format("select * from %s", fullTableName));
+		} catch (SQLException se){
+			SQLExceptionCode code = SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL;
+			assertSqlExceptionCode(code, se);
+			return;
+		}
+		Assert.fail("We should have thrown an exception for the too-early SCN");
+	}
+
+	private void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+		assertEquals(code.getErrorCode(), se.getErrorCode());
+		assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+		assertEquals(code.getSQLState(), se.getSQLState());
+	}
+
+	private String createTableWithTTL(int ttl) throws SQLException, InterruptedException {
 		String schemaName = generateUniqueName();
 		String tableName = generateUniqueName();
+		StringBuilder optionsBuilder = new StringBuilder();
+		if (ttl > 0){
+			optionsBuilder.append("TTL=");
+			optionsBuilder.append(ttl);
+		}
+		String ddlOptions = optionsBuilder.toString();
 		String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
 		try (Connection conn = DriverManager.getConnection(getUrl())) {
 			conn.createStatement()
-					.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) TTL=2");
+					.execute(String.format("CREATE TABLE %s" +
+						"(k VARCHAR PRIMARY KEY, v VARCHAR) %s", fullTableName, ddlOptions));
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
 			conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
 			conn.commit();
-			// TTL is 2 sec
-			Thread.sleep(3000);
-		}
-
-		Properties props = new Properties();
-		props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
-				Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
-		try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
-			ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
-			assertFalse(rs.next());
-			rs.close();
 		}
+		return fullTableName;
 	}
 
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c09cad7..d0cdb3d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,15 +54,12 @@ import com.google.common.collect.Lists;
 public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class);
     private final boolean async;
-    private final String tableDDLOptions;
-
+    private String tableDDLOptions;
+    private StringBuilder optionBuilder;
+    private final boolean encoded;
     public GlobalIndexCheckerIT(boolean async, boolean encoded) {
         this.async = async;
-        StringBuilder optionBuilder = new StringBuilder();
-        if (!encoded) {
-            optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
-        }
-        this.tableDDLOptions = optionBuilder.toString();
+        this.encoded = encoded;
     }
 
     @BeforeClass
@@ -71,6 +69,15 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
+    @Before
+    public void beforeTest(){
+        optionBuilder = new StringBuilder();
+        if (!encoded) {
+            optionBuilder.append(" COLUMN_ENCODED_BYTES=0");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
     @Parameters(
             name = "async={0},encoded={1}")
     public static Collection<Object[]> data() {
@@ -305,17 +312,8 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
     public void testOnePhaseOverwrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
-            populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
             String indexTableName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
-                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
-                    dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
-            if (async) {
-                // run the index MR job.
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
-            }
+            createTableAndIndexes(conn, dataTableName, indexTableName);
             // Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index
             // update phase) and check that this does not impact the correctness (one overwrite)
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
@@ -385,21 +383,35 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName) throws Exception {
+        createTableAndIndexes(conn, dataTableName, indexTableName, 1);
+    }
+
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName, int indexVersions) throws Exception {
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") +
+            " VERSIONS=" + indexVersions);
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
+                dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+
+            " VERSIONS=" + indexVersions);
+        conn.commit();
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
+        }
+    }
+
     @Test
     public void testFailDataTableAndPostIndexRowUpdate() throws Exception {
-        String dataTableName = generateUniqueName();
-        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+
         try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
             String indexName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
-                    dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
-            conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
-                    dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
-            if (async) {
-                // run the index MR job.
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
-                IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
-            }
+            createTableAndIndexes(conn, dataTableName, indexName);
             // Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase)
             // and check that this does not impact the correctness
             IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index e0d62a2..6854a75 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -20,28 +20,108 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.NavigableSet;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.client.Scan;
 
 public class ScanInfoUtil {
+    public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+        "phoenix.max.lookback.age.seconds";
+    public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
     private ScanInfoUtil() {
     }
-    
+
     public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
         return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
     }
-    
+
     public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
         return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), scanInfo.getMinVersions(),
                     scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
                     scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
     }
 
-    public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,long readPt) throws IOException {
+    public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+                                                  final NavigableSet<byte[]> columns,long readPt)
+        throws IOException {
         if(!scan.isReversed()) {
             return new StoreScanner(store, scanInfo, scan, columns,readPt);
         } else {
             return new ReversedStoreScanner(store, scanInfo, scan, columns,readPt);
         }
     }
+
+    public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
+                                                   ScanInfo scanInfo) {
+        long ttl = scanInfo.getTtl();
+        long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
+        if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
+            if (ttl == Long.MAX_VALUE
+                && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
+                // If user configured default TTL(FOREVER) and keep deleted cells to false or
+                // TTL then to remove unwanted delete markers we should change ttl to max lookback age
+                ttl = maxLookbackTtl;
+            } else {
+                //if there is a TTL, use TTL instead of max lookback age.
+                // Max lookback age should be more recent or equal to TTL
+                ttl = Math.max(ttl, maxLookbackTtl);
+            }
+        }
+
+        return ttl;
+    }
+
+    /*
+     * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
+     * let delete markers age once lookback age is done.
+     */
+    private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType scanType) {
+        //if we're doing a minor compaction or flush, always set keep deleted cells
+        //to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL,
+        //where the value of the ttl might be overriden to the max lookback age elsewhere
+        return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE
+            || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
+            KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
+    }
+
+    /*
+     * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases).
+     * Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want
+     * Math.max(maxVersions, minVersions, 1)
+     */
+    private static int getMinVersions(ScanInfo oldScanInfo, final Store store) {
+        return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions()
+            : Math.max(Math.max(store.getFamily().getMinVersions(),
+            store.getFamily().getMaxVersions()),1);
+    }
+
+    public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf,
+                                                               ScanInfo oldScanInfo,
+                                                         final Store store,
+                                                         ScanType type) {
+        long ttl = getTimeToLiveForCompactions(store.getFamily(), oldScanInfo);
+        KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store, type);
+        int minVersions = getMinVersions(oldScanInfo, store);
+        return new ScanInfo(conf,store.getFamily().getName(), minVersions,
+            Integer.MAX_VALUE, ttl, keepDeletedCells,
+            oldScanInfo.getTimeToPurgeDeletes(),
+            oldScanInfo.getComparator());
+    }
+
+    private static long getMaxLookback(Configuration conf){
+        //config param is in seconds, switch to millis
+        return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+    }
+
+    public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+        return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
+    }
+
+    private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+        return maxLookbackTime > 0L;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3e5f5ee..936b7a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -25,13 +25,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
 import org.apache.phoenix.compile.JoinCompiler.JoinTable;
 import org.apache.phoenix.compile.JoinCompiler.Table;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
@@ -66,6 +71,7 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -74,6 +80,7 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -146,6 +153,7 @@ public class QueryCompiler {
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
     public QueryPlan compile() throws SQLException{
+        verifySCN();
         QueryPlan plan;
         if (select.isUnion()) {
             plan = compileUnionAll(select);
@@ -155,6 +163,52 @@ public class QueryCompiler {
         return plan;
     }
 
+    private void verifySCN() throws SQLException {
+        PhoenixConnection conn = statement.getConnection();
+        Long scn = conn.getSCN();
+        if (scn == null) {
+            return;
+        }
+        List<TableRef> scnTooOldTableRefs = new ArrayList<TableRef>();
+        ColumnResolver resolver =
+            FromCompiler.getResolverForQuery(select, conn);
+        List<TableRef> involvedTables = resolver.getTables();
+        int maxLookBackAge = conn.getQueryServices().
+            getConfiguration().getInt(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+            ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE);
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        if (maxLookBackAge > 0 && now - maxLookBackAge * 1000L > scn){
+            throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
+                .build().buildException();
+        }
+        for (TableRef tableRef : involvedTables) {
+            byte[] tableQualifier = tableRef.getTable().getPhysicalName().getBytes();
+            //we can have a tableRef with an empty table, such as with sequences
+            if (tableQualifier.length > 0) {
+                HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier);
+                HColumnDescriptor[] cds = td.getColumnFamilies();
+                now = EnvironmentEdgeManager.currentTimeMillis();
+                if (cds.length > 0){
+                    //Phoenix only allows a single table level TTL, so any CF will do
+                    if (now - cds[0].getTimeToLive() * 1000L > scn) {
+                        scnTooOldTableRefs.add(tableRef);
+                    }
+                }
+            }
+        }
+        if (scnTooOldTableRefs.size() > 0) {
+            TableRef tableRef = scnTooOldTableRefs.get(0);
+            throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL)
+                .setSchemaName(tableRef.getTable().getSchemaName().getString())
+                .setTableName(tableRef.getTable().getTableName().getString())
+                .build()
+                .buildException();
+        }
+
+    }
+
     public QueryPlan compileUnionAll(SelectStatement select) throws SQLException { 
         List<SelectStatement> unionAllSelects = select.getSelects();
         List<QueryPlan> plans = new ArrayList<QueryPlan>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 3da3f6b..d229b28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,13 +18,18 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -32,14 +37,17 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -56,6 +64,7 @@ import org.apache.phoenix.util.TransactionUtil;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
+    private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class);
 
     public static final String AGGREGATORS = "_Aggs";
     public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
@@ -124,7 +133,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     // In case of Index Write failure, we need to determine that Index mutation
     // is part of normal client write or Index Rebuilder. # PHOENIX-5080
     public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3);
-    
     public enum ReplayWrite {
         TABLE_AND_INDEX,
         INDEX_ONLY,
@@ -368,20 +376,107 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
+
     @Override
     public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
-        final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
-        final KeyValueScanner s) throws IOException {
+                                               final Store store, final Scan scan,
+                                               final NavigableSet<byte[]> targetCols,
+                                               final KeyValueScanner s) throws IOException {
+        if (storeFileScanDoesntNeedAlteration(store, scan)) {
+            return s;
+        }
 
-      if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
-        return s;
-      }
-      
-      if (s!=null) {
-          s.close();
-      }
-      ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
-      return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
-          c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+        if (s != null) {
+            s.close();
+        }
+        ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+        return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
+            c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+    }
+
+    private boolean storeFileScanDoesntNeedAlteration(Store store, Scan scan) {
+        boolean isRaw = scan.isRaw();
+        //true if keep deleted cells is either TRUE or TTL
+        boolean keepDeletedCells = ScanInfoUtil.isKeepDeletedCells(store.getScanInfo());
+        boolean timeRangeIsLatest = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
+        boolean timestampIsTransactional =
+            TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax());
+        return isRaw
+            || keepDeletedCells
+            || timeRangeIsLatest
+            || timestampIsTransactional;
+    }
+
+    @Override
+    public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                               final Store store,
+                                               final KeyValueScanner memstoreScanner,
+                                               final InternalScanner s)
+        throws IOException {
+
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+            return s;
+        }
+
+        //close last scanner object before creating a new one
+        if(s != null) {
+            s.close();
+        }
+
+        // Called during flushing the memstore to disk.
+        // Need to retain all the delete markers & all the versions
+        Scan scan = new Scan();
+        scan.setMaxVersions(Integer.MAX_VALUE);
+        ScanInfo oldScanInfo = store.getScanInfo();
+
+        Configuration conf = c.getEnvironment().getConfiguration();
+        //minor compactions and flushes both use "compact retain deletes"
+        ScanType scanType = ScanType.COMPACT_RETAIN_DELETES;
+        ScanInfo scanInfo =
+            ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo, store, scanType);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating the store scanner with :" + scanInfo + ", " +
+                "scan object:" + scan + " for table " + store.getTableName().getNameAsString() +
+                " and region " + store.getRegionInfo().getRegionNameAsString() +
+                " and cf " + store.getColumnFamilyName());
+        }
+        return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+            scanType, store.getSmallestReadPoint(),
+            HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(
+        final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+        List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+        final InternalScanner s) throws IOException {
+
+        if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+            return s;
+        }
+        //close last scanner object before creating a new one
+        if(s != null) {
+            s.close();
+        }
+        Scan scan = new Scan();
+        scan.setMaxVersions(Integer.MAX_VALUE);
+        ScanInfo oldScanInfo = store.getScanInfo();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Compaction triggering for table:" +
+                store.getRegionInfo().getTable().toString()
+                + " with scanType " + scanType  + " for table " +
+                store.getTableName().getNameAsString() + " and region " +
+                store.getRegionInfo().getRegionNameAsString() +
+                " and cf " + store.getColumnFamilyName());
+        }
+
+        Configuration conf = c.getEnvironment().getConfiguration();
+        ScanInfo scanInfo =
+            ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo,
+                store, scanType);
+        return new StoreScanner(store, scanInfo, scan, scanners, scanType,
+            store.getSmallestReadPoint(),
+            earliestPutTs);
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b597c49..2ee4831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
@@ -1335,9 +1336,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // This will lead to failure of cross cluster RPC if the effective user is not
         // the login user. Switch to the login user context to ensure we have the expected
         // security context.
+
         final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
         // since we will make a call to syscat, do nothing if we are compacting syscat itself
-        if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
+        // also, if max lookback age is already configured, we're already taken care of elsewhere
+        // unless the index stays disabled beyond the max lookback age, in which case you probably
+        // want to rebuild anyway
+        if (request.isMajor() &&
+            !ScanInfoUtil.isMaxLookbackTimeEnabled(compactionConfig) &&
+            !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
             return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
                 @Override
                 public InternalScanner run() throws Exception {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 94b9c39..42acab8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -184,7 +184,12 @@ public enum SQLExceptionCode {
      INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."),
      UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
      ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"),
-     /**
+     CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL(537, "42914",
+         "Cannot use SCN to look further back in the past beyond the TTL"),
+     CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
+            "Cannot use SCN to look further back in the past beyond the configured max lookback age"),
+
+    /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.
      *
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index acd4804..5d6c83f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -537,4 +537,5 @@ public class GlobalIndexChecker extends BaseRegionObserver {
     public void stop(CoprocessorEnvironment e) throws IOException {
         this.hTableFactory.shutdown();
     }
+
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index dee02d1..6a4a8f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -51,7 +51,7 @@ public class TransactionUtil {
     }
     
     public static boolean isTransactionalTimestamp(long ts) {
-        return ts >= MAX_NON_TX_TIMESTAMP;
+        return ts >= MAX_NON_TX_TIMESTAMP && ts != HConstants.LATEST_TIMESTAMP;
     }
     
     public static boolean isDelete(Cell cell) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c1901cb..9424379 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -48,14 +48,20 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -79,6 +85,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ByteBasedLikeExpression;
@@ -125,6 +132,7 @@ import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -842,6 +850,13 @@ public class TestUtil {
         conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps)));
     }
 
+    public static void dumpTable(Connection conn, TableName tableName)
+        throws SQLException, IOException{
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableInterface table = cqs.getTable(tableName.getName());
+        dumpTable(table);
+    }
+
     public static void dumpTable(HTableInterface table) throws IOException {
         System.out.println("************ dumping " + table + " **************");
         Scan s = new Scan();
@@ -854,7 +869,9 @@ public class TestUtil {
                 Cell current = null;
                 while (cellScanner.advance()) {
                     current = cellScanner.current();
-                    System.out.println(current);
+                    System.out.println(current + "column= " +
+                        Bytes.toString(CellUtil.cloneQualifier(current)) +
+                        " val=" + Bytes.toString(CellUtil.cloneValue(current)));
                 }
             }
         }
@@ -884,6 +901,46 @@ public class TestUtil {
         return rows;
     }
 
+    public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
+        Scan s = new Scan();
+        s.setRaw(isRaw);;
+        s.setMaxVersions();
+
+        CellCount cellCount = new CellCount();
+        try (ResultScanner scanner = table.getScanner(s)) {
+            Result result = null;
+            while ((result = scanner.next()) != null) {
+                CellScanner cellScanner = result.cellScanner();
+                Cell current = null;
+                while (cellScanner.advance()) {
+                    current = cellScanner.current();
+                    cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current)));
+                }
+            }
+        }
+        return cellCount;
+    }
+
+    static class CellCount {
+        private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();
+
+        void addCell(String key){
+            if (rowCountMap.containsKey(key)){
+                rowCountMap.put(key, rowCountMap.get(key) +1);
+            } else {
+                rowCountMap.put(key, 1);
+            }
+        }
+
+        int getCellCount(String key){
+            if (rowCountMap.containsKey(key)){
+                return rowCountMap.get(key);
+            } else {
+                return 0;
+            }
+        }
+    }
+
     public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
         try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { 
             System.out.println("************ dumping index status for " + indexName + " **************");
@@ -1131,4 +1188,81 @@ public class TestUtil {
         }
         return false;
     }
+
+
+    public static void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+        assertEquals(code.getErrorCode(), se.getErrorCode());
+        assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+        assertEquals(code.getSQLState(), se.getSQLState());
+    }
+
+    public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl)
+        throws SQLException, IOException {
+        HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+        Assert.assertEquals(ttl, cd.getTimeToLive());
+    }
+
+    public static void assertTableHasVersions(Connection conn, TableName tableName, int versions)
+        throws SQLException, IOException {
+        HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+        Assert.assertEquals(versions, cd.getMaxVersions());
+    }
+
+    public static HColumnDescriptor getColumnDescriptor(Connection conn, TableName tableName)
+        throws SQLException, IOException {
+        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        HTableDescriptor td = admin.getTableDescriptor(tableName);
+        return td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+    }
+
+    public static void assertRawRowCount(Connection conn, TableName table, int expectedRowCount)
+        throws SQLException, IOException {
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        int count = TestUtil.getRawRowCount(cqs.getTable(table.getName()));
+        assertEquals(expectedRowCount, count);
+    }
+
+    public static void assertRawCellCount(Connection conn, TableName tableName,
+                                          byte[] row, int expectedCellCount)
+        throws SQLException, IOException{
+        ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        Table table = cqs.getTable(tableName.getName());
+        CellCount cellCount = getCellCount(table, true);
+        int count = cellCount.getCellCount(Bytes.toString(row));
+        assertEquals(expectedCellCount, count);
+    }
+
+    public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist)
+        throws SQLException {
+        boolean rowExists = false;
+        Properties props = new Properties();
+        ResultSet rs;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        try (Connection conn = DriverManager.getConnection(url, props)){
+            rs = conn.createStatement().executeQuery(sql);
+            rowExists = rs.next();
+            if (shouldExist){
+                Assert.assertTrue("Row was not found at time " + scn +
+                        " when it should have been",
+                    rowExists);
+            } else {
+                Assert.assertFalse("Row was found at time " + scn +
+                    " when it should not have been", rowExists);
+            }
+        }
+
+    }
+
+    public static void assertRowHasExpectedValueAtSCN(String url, String sql,
+                                                      long scn, String value) throws SQLException {
+        Properties props = new Properties();
+        ResultSet rs;
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        try (Connection conn = DriverManager.getConnection(url, props)){
+            rs = conn.createStatement().executeQuery(sql);
+            Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next());
+            Assert.assertEquals(value, rs.getString(1));
+        }
+
+    }
 }