You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/01/16 00:23:27 UTC
[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5645 -
BaseScannerRegionObserver should prevent compaction from purging very
recently deleted cells
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
new 9b46ffd PHOENIX-5645 - BaseScannerRegionObserver should prevent compaction from purging very recently deleted cells
9b46ffd is described below
commit 9b46ffd47f1e882c3545911a41142c3ac69c56d1
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Mon Jan 13 11:04:05 2020 -0800
PHOENIX-5645 - BaseScannerRegionObserver should prevent compaction from purging very recently deleted cells
---
.../org/apache/phoenix/end2end/MaxLookbackIT.java | 418 +++++++++++++++++++++
.../it/java/org/apache/phoenix/end2end/SCNIT.java | 72 +++-
.../end2end/index/GlobalIndexCheckerIT.java | 68 ++--
.../hadoop/hbase/regionserver/ScanInfoUtil.java | 86 ++++-
.../org/apache/phoenix/compile/QueryCompiler.java | 54 +++
.../coprocessor/BaseScannerRegionObserver.java | 121 +++++-
.../UngroupedAggregateRegionObserver.java | 9 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 7 +-
.../apache/phoenix/index/GlobalIndexChecker.java | 1 +
.../org/apache/phoenix/util/TransactionUtil.java | 2 +-
.../java/org/apache/phoenix/util/TestUtil.java | 136 ++++++-
11 files changed, 913 insertions(+), 61 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
new file mode 100644
index 0000000..e27796c
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
+import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
+import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
+import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
+import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+
+@NeedsOwnMiniClusterTest
+public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
+ private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
+ private static final int MAX_LOOKBACK_AGE = 15;
+ private static final int ROWS_POPULATED = 2;
+ public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
+ private String tableDDLOptions;
+ private StringBuilder optionBuilder;
+ ManualEnvironmentEdge injectEdge;
+ private int ttl;
+
+ private class ManualEnvironmentEdge extends EnvironmentEdge {
+ // Sometimes 0 ts might have a special value, so lets start with 1
+ protected long value = 1L;
+
+ public void setValue(long newValue) {
+ value = newValue;
+ }
+
+ public void incrementValue(long addedValue) {
+ value += addedValue;
+ }
+
+ @Override
+ public long currentTime() {
+ return this.value;
+ }
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+ props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void beforeTest(){
+ EnvironmentEdgeManager.reset();
+ optionBuilder = new StringBuilder();
+ this.tableDDLOptions = optionBuilder.toString();
+ ttl = 0;
+ injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ }
+
+ @Test
+ public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+ String dataTableName = generateUniqueName();
+ createTable(dataTableName);
+ //increase long enough to make sure we can find the syscat row for the table
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ populateTable(dataTableName);
+ long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(populateTime));
+ try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+ connscn.createStatement().executeQuery("select * from " + dataTableName);
+ } catch (SQLException se) {
+ SQLExceptionCode code =
+ SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+ TestUtil.assertSqlExceptionCode(code, se);
+ return;
+ }
+ Assert.fail("We should have thrown an exception for the too-early SCN");
+ }
+
+ @Test(timeout=120000L)
+ public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ TableName indexTable = TableName.valueOf(indexName);
+ //make sure we're after the inserts have been committed
+ injectEdge.incrementValue(1);
+ long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(10); //make sure we delete at a different ts
+ Statement stmt = conn.createStatement();
+ stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
+ Assert.assertEquals(1, stmt.getUpdateCount());
+ conn.commit();
+ //select stmt to get row we deleted
+ String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName);
+ String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
+ int rowsPlusDeleteMarker = ROWS_POPULATED;
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ flush(dataTable);
+ flush(indexTable);
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(1); //new ts for major compaction
+ majorCompact(dataTable, beforeFirstCompactSCN);
+ majorCompact(indexTable, beforeFirstCompactSCN);
+ assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+ assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+ //wait for the lookback time. After this compactions should purge the deleted row
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+ String notDeletedRowSql =
+ String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
+ String notDeletedIndexRowSql =
+ String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " values ('c', 'cd', 'cde', 'cdef')");
+ conn.commit();
+ majorCompact(dataTable, beforeSecondCompactSCN);
+ majorCompact(indexTable, beforeSecondCompactSCN);
+ //should still be ROWS_POPULATED because we added one and deleted one
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+ //deleted row should be gone, but not deleted row should still be there.
+ assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+
+ }
+ }
+
+ @Test(timeout=60000L)
+ public void testTTLAndMaxLookbackAge() throws Exception {
+ ttl = 20;
+ optionBuilder.append("TTL=" + ttl);
+ tableDDLOptions = optionBuilder.toString();
+ Configuration conf = getUtility().getConfiguration();
+ //disable automatic memstore flushes
+ long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+ HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ //increment to make sure we don't "look back" past table creation
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ injectEdge.incrementValue(1);
+ long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasTtl(conn, dataTable, ttl);
+ assertTableHasTtl(conn, indexTable, ttl);
+ //first make sure we inserted correctly
+ String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+ String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+ assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+ int originalRowCount = 2;
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //force a flush
+ flush(dataTable);
+ flush(indexTable);
+ //flush shouldn't have changed it
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ //make sure it's still on disk
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ injectEdge.incrementValue(1); //get a new timestamp for compaction
+ majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ //nothing should have been purged by this major compaction
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //now wait the TTL
+ timeToAdvance = (ttl * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ //make sure that we can compact away the now-expired rows
+ majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ //note that before HBase 1.4, we don't have HBASE-17956
+ // and this will always return 0 whether it's still on-disk or not
+ assertRawRowCount(conn, dataTable, 0);
+ assertRawRowCount(conn, indexTable, 0);
+ } finally{
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testRecentMaxVersionsNotCompactedAway() throws Exception {
+ int versions = 2;
+ optionBuilder.append("VERSIONS=" + versions);
+ tableDDLOptions = optionBuilder.toString();
+ String firstValue = "abc";
+ String secondValue = "def";
+ String thirdValue = "ghi";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ //increment to make sure we don't "look back" past table creation
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, versions);
+ injectEdge.incrementValue(1); //increment by 1 so we can see our write
+ long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ //make sure table and index metadata is set up right for versions
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasVersions(conn, dataTable, versions);
+ assertTableHasVersions(conn, indexTable, versions);
+ //check query optimizer is doing what we expect
+ String dataTableSelectSql =
+ String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+ String indexTableSelectSql =
+ String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+ assertExplainPlan(conn, indexTableSelectSql, dataTableName, indexName);
+ //make sure the data was inserted correctly in the first place
+ assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue);
+ assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue);
+ //force first update to get a distinct ts
+ injectEdge.incrementValue(1);
+ updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
+ injectEdge.incrementValue(1); //now make update visible
+ long afterFirstUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
+ //force second update to get a distinct ts
+ injectEdge.incrementValue(1);
+ updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
+ injectEdge.incrementValue(1);
+ long afterSecondUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(1);
+ //check to make sure we can see all three versions at the appropriate times
+ String[] allValues = {firstValue, secondValue, thirdValue};
+ long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN};
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+ flush(dataTable);
+ flush(indexTable);
+ //after flush, check to make sure we can see all three versions at the appropriate times
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+ majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ //after major compaction, check to make sure we can see all three versions
+ // at the appropriate times
+ assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
+ assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis();
+ majorCompact(dataTable, afterLookbackAgeSCN);
+ majorCompact(indexTable, afterLookbackAgeSCN);
+ //empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
+ assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
+ //2 versions of empty column, 2 versions of val2,
+ // 2 versions of val3 (since we write whole rows to index) = 6
+ assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
+ //empty column + 1 version each of val1,2 and 3 = 4
+ assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
+ //1 version of empty column, 1 version of val2, 1 version of val3 = 3
+ assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3);
+ }
+ }
+
+ private void flush(TableName table) throws IOException {
+ Admin admin = getUtility().getHBaseAdmin();
+ admin.flush(table);
+ }
+
+ private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception {
+ Admin admin = getUtility().getHBaseAdmin();
+ admin.majorCompact(table);
+ long lastCompactionTimestamp;
+ AdminProtos.GetRegionInfoResponse.CompactionState state = null;
+ while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN
+ || (state = admin.getCompactionState(table)).
+ equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Last compaction time:" + lastCompactionTimestamp);
+ LOG.trace("CompactionState: " + state);
+ }
+ Thread.sleep(100);
+ }
+ }
+
+ private void assertMultiVersionLookbacks(String dataTableSelectSql,
+ String[] values, long[] scns)
+ throws Exception {
+ //make sure we can still look back after updating
+ for (int k = 0; k < values.length; k++){
+ assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]);
+ }
+ }
+
+ private void updateColumn(Connection conn, String dataTableName,
+ String idColumn, String id, String valueColumn, String value)
+ throws SQLException {
+ String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
+ dataTableName, idColumn, valueColumn, id, value);
+ conn.createStatement().execute(upsertSql);
+ conn.commit();
+ }
+
+ private void createTable(String tableName) throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String createSql = "create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), " +
+ "val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
+ conn.createStatement().execute(createSql);
+ conn.commit();
+ }
+ }
+ private void populateTable(String tableName) throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+ conn.commit();
+ }
+ }
+
+ private void createIndex(String dataTableName, String indexTableName, int indexVersions)
+ throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ //4.14 and below need to set TTL manually on indexes.
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)" +
+ " VERSIONS=" + indexVersions +
+ (ttl > 0 ? ", TTL=" + ttl : ""));
+ conn.commit();
+ }
+ }
+
+ public static void assertExplainPlan(Connection conn, String selectSql,
+ String dataTableFullName, String indexTableFullName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
+ }
+
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
index 6c45b06..d79752e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
@@ -24,11 +24,17 @@ import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
import org.junit.Test;
public class SCNIT extends ParallelStatsDisabledIT {
@@ -67,7 +73,7 @@ public class SCNIT extends ParallelStatsDisabledIT {
rs.close();
}
props.clear();
- props.setProperty("CurrentSCN", Long.toString(timeAfterDelete));
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertTrue(rs.next());
@@ -82,28 +88,68 @@ public class SCNIT extends ParallelStatsDisabledIT {
@Test
public void testSCNWithTTL() throws Exception {
+ int ttl = 2;
+ String fullTableName = createTableWithTTL(ttl);
+ //sleep for one second longer than ttl
+ Thread.sleep(ttl * 1000 + 1000);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
+ try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+ ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
+ assertFalse(rs.next());
+ rs.close();
+ }
+ }
+
+ @Test
+ public void testTooLowSCNWithTTL() throws Exception {
+ //if scn is for an older time than a table's ttl, it should throw a SQLException
+ int ttl = 2;
+ String fullTableName = createTableWithTTL(ttl);
+ int sleepTime = (ttl + 1)* 1000;
+ //need to sleep long enough for the SCN to still find the syscat row for the table
+ Thread.sleep(sleepTime);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+ Long.toString(EnvironmentEdgeManager.currentTime() - sleepTime));
+ try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+ connscn.createStatement().
+ executeQuery(String.format("select * from %s", fullTableName));
+ } catch (SQLException se){
+ SQLExceptionCode code = SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL;
+ assertSqlExceptionCode(code, se);
+ return;
+ }
+ Assert.fail("We should have thrown an exception for the too-early SCN");
+ }
+
+ private void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+ assertEquals(code.getErrorCode(), se.getErrorCode());
+ assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+ assertEquals(code.getSQLState(), se.getSQLState());
+ }
+
+ private String createTableWithTTL(int ttl) throws SQLException, InterruptedException {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
+ StringBuilder optionsBuilder = new StringBuilder();
+ if (ttl > 0){
+ optionsBuilder.append("TTL=");
+ optionsBuilder.append(ttl);
+ }
+ String ddlOptions = optionsBuilder.toString();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
- .execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) TTL=2");
+ .execute(String.format("CREATE TABLE %s" +
+ "(k VARCHAR PRIMARY KEY, v VARCHAR) %s", fullTableName, ddlOptions));
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
conn.commit();
- // TTL is 2 sec
- Thread.sleep(3000);
- }
-
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
- Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
- try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
- ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
- assertFalse(rs.next());
- rs.close();
}
+ return fullTableName;
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c09cad7..d0cdb3d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,15 +54,12 @@ import com.google.common.collect.Lists;
public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class);
private final boolean async;
- private final String tableDDLOptions;
-
+ private String tableDDLOptions;
+ private StringBuilder optionBuilder;
+ private final boolean encoded;
public GlobalIndexCheckerIT(boolean async, boolean encoded) {
this.async = async;
- StringBuilder optionBuilder = new StringBuilder();
- if (!encoded) {
- optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
- }
- this.tableDDLOptions = optionBuilder.toString();
+ this.encoded = encoded;
}
@BeforeClass
@@ -71,6 +69,15 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
+ @Before
+ public void beforeTest(){
+ optionBuilder = new StringBuilder();
+ if (!encoded) {
+ optionBuilder.append(" COLUMN_ENCODED_BYTES=0");
+ }
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
@Parameters(
name = "async={0},encoded={1}")
public static Collection<Object[]> data() {
@@ -305,17 +312,8 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
public void testOnePhaseOverwrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
- populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
String indexTableName = generateUniqueName();
- conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
- dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
- conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
- dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
- if (async) {
- // run the index MR job.
- IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
- IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
- }
+ createTableAndIndexes(conn, dataTableName, indexTableName);
// Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index
// update phase) and check that this does not impact the correctness (one overwrite)
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
@@ -385,21 +383,35 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ private void createTableAndIndexes(Connection conn, String dataTableName,
+ String indexTableName) throws Exception {
+ createTableAndIndexes(conn, dataTableName, indexTableName, 1);
+ }
+
+ private void createTableAndIndexes(Connection conn, String dataTableName,
+ String indexTableName, int indexVersions) throws Exception {
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") +
+ " VERSIONS=" + indexVersions);
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
+ dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+
+ " VERSIONS=" + indexVersions);
+ conn.commit();
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
+ }
+ }
+
@Test
public void testFailDataTableAndPostIndexRowUpdate() throws Exception {
- String dataTableName = generateUniqueName();
- populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+
try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
- conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
- dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
- conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
- dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
- if (async) {
- // run the index MR job.
- IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
- IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
- }
+ createTableAndIndexes(conn, dataTableName, indexName);
// Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase)
// and check that this does not impact the correctness
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index e0d62a2..6854a75 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -20,28 +20,108 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.NavigableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Scan;
public class ScanInfoUtil {
+ public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
+ "phoenix.max.lookback.age.seconds";
+ public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;
+
private ScanInfoUtil() {
}
-
+
public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
}
-
+
public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), scanInfo.getMinVersions(),
scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
}
- public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,long readPt) throws IOException {
+ public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ final NavigableSet<byte[]> columns,long readPt)
+ throws IOException {
if(!scan.isReversed()) {
return new StoreScanner(store, scanInfo, scan, columns,readPt);
} else {
return new ReversedStoreScanner(store, scanInfo, scan, columns,readPt);
}
}
+
+ public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
+ ScanInfo scanInfo) {
+ long ttl = scanInfo.getTtl();
+ long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
+ if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
+ if (ttl == Long.MAX_VALUE
+ && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
+ // If user configured default TTL(FOREVER) and keep deleted cells to false or
+ // TTL then to remove unwanted delete markers we should change ttl to max lookback age
+ ttl = maxLookbackTtl;
+ } else {
+ //if there is a TTL, use TTL instead of max lookback age.
+ // Max lookback age should be more recent or equal to TTL
+ ttl = Math.max(ttl, maxLookbackTtl);
+ }
+ }
+
+ return ttl;
+ }
+
+ /*
+ * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
+ * let delete markers age once lookback age is done.
+ */
+ private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType scanType) {
+ //if we're doing a minor compaction or flush, always set keep deleted cells
+ //to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL,
+ //where the value of the ttl might be overriden to the max lookback age elsewhere
+ return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE
+ || scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
+ KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
+ }
+
+ /*
+ * if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases).
+ * Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want
+ * Math.max(maxVersions, minVersions, 1)
+ */
+ private static int getMinVersions(ScanInfo oldScanInfo, final Store store) {
+ return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions()
+ : Math.max(Math.max(store.getFamily().getMinVersions(),
+ store.getFamily().getMaxVersions()),1);
+ }
+
+ public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf,
+ ScanInfo oldScanInfo,
+ final Store store,
+ ScanType type) {
+ long ttl = getTimeToLiveForCompactions(store.getFamily(), oldScanInfo);
+ KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store, type);
+ int minVersions = getMinVersions(oldScanInfo, store);
+ return new ScanInfo(conf,store.getFamily().getName(), minVersions,
+ Integer.MAX_VALUE, ttl, keepDeletedCells,
+ oldScanInfo.getTimeToPurgeDeletes(),
+ oldScanInfo.getComparator());
+ }
+
+ private static long getMaxLookback(Configuration conf){
+ //config param is in seconds, switch to millis
+ return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
+ }
+
+ public static boolean isMaxLookbackTimeEnabled(Configuration conf){
+ return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
+ }
+
+ private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+ return maxLookbackTime > 0L;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3e5f5ee..936b7a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -25,13 +25,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
import org.apache.phoenix.compile.JoinCompiler.JoinTable;
import org.apache.phoenix.compile.JoinCompiler.Table;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.ClientScanPlan;
@@ -66,6 +71,7 @@ import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
import org.apache.phoenix.parse.TableNode;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -74,6 +80,7 @@ import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
@@ -146,6 +153,7 @@ public class QueryCompiler {
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
public QueryPlan compile() throws SQLException{
+ verifySCN();
QueryPlan plan;
if (select.isUnion()) {
plan = compileUnionAll(select);
@@ -155,6 +163,52 @@ public class QueryCompiler {
return plan;
}
+ private void verifySCN() throws SQLException {
+ PhoenixConnection conn = statement.getConnection();
+ Long scn = conn.getSCN();
+ if (scn == null) {
+ return;
+ }
+ List<TableRef> scnTooOldTableRefs = new ArrayList<TableRef>();
+ ColumnResolver resolver =
+ FromCompiler.getResolverForQuery(select, conn);
+ List<TableRef> involvedTables = resolver.getTables();
+ int maxLookBackAge = conn.getQueryServices().
+ getConfiguration().getInt(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ if (maxLookBackAge > 0 && now - maxLookBackAge * 1000L > scn){
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE)
+ .build().buildException();
+ }
+ for (TableRef tableRef : involvedTables) {
+ byte[] tableQualifier = tableRef.getTable().getPhysicalName().getBytes();
+ //we can have a tableRef with an empty table, such as with sequences
+ if (tableQualifier.length > 0) {
+ HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier);
+ HColumnDescriptor[] cds = td.getColumnFamilies();
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ if (cds.length > 0){
+ //Phoenix only allows a single table level TTL, so any CF will do
+ if (now - cds[0].getTimeToLive() * 1000L > scn) {
+ scnTooOldTableRefs.add(tableRef);
+ }
+ }
+ }
+ }
+ if (scnTooOldTableRefs.size() > 0) {
+ TableRef tableRef = scnTooOldTableRefs.get(0);
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL)
+ .setSchemaName(tableRef.getTable().getSchemaName().getString())
+ .setTableName(tableRef.getTable().getTableName().getString())
+ .build()
+ .buildException();
+ }
+
+ }
+
public QueryPlan compileUnionAll(SelectStatement select) throws SQLException {
List<SelectStatement> unionAllSelects = select.getSelects();
List<QueryPlan> plans = new ArrayList<QueryPlan>();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 3da3f6b..d229b28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,13 +18,18 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -32,14 +37,17 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -56,6 +64,7 @@ import org.apache.phoenix.util.TransactionUtil;
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
+ private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class);
public static final String AGGREGATORS = "_Aggs";
public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions";
@@ -124,7 +133,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
// In case of Index Write failure, we need to determine that Index mutation
// is part of normal client write or Index Rebuilder. # PHOENIX-5080
public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3);
-
public enum ReplayWrite {
TABLE_AND_INDEX,
INDEX_ONLY,
@@ -368,20 +376,107 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
+
@Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
- final KeyValueScanner s) throws IOException {
+ final Store store, final Scan scan,
+ final NavigableSet<byte[]> targetCols,
+ final KeyValueScanner s) throws IOException {
+ if (storeFileScanDoesntNeedAlteration(store, scan)) {
+ return s;
+ }
- if (scan.isRaw() || ScanInfoUtil.isKeepDeletedCells(store.getScanInfo()) || scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP || TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax())) {
- return s;
- }
-
- if (s!=null) {
- s.close();
- }
- ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
- return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
- c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+ if (s != null) {
+ s.close();
+ }
+ ScanInfo scanInfo = ScanInfoUtil.cloneScanInfoWithKeepDeletedCells(store.getScanInfo());
+ return ScanInfoUtil.createStoreScanner(store, scanInfo, scan, targetCols,
+ c.getEnvironment().getRegion().getReadpoint(scan.getIsolationLevel()));
+ }
+
+ private boolean storeFileScanDoesntNeedAlteration(Store store, Scan scan) {
+ boolean isRaw = scan.isRaw();
+ //true if keep deleted cells is either TRUE or TTL
+ boolean keepDeletedCells = ScanInfoUtil.isKeepDeletedCells(store.getScanInfo());
+ boolean timeRangeIsLatest = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;
+ boolean timestampIsTransactional =
+ TransactionUtil.isTransactionalTimestamp(scan.getTimeRange().getMax());
+ return isRaw
+ || keepDeletedCells
+ || timeRangeIsLatest
+ || timestampIsTransactional;
+ }
+
+ @Override
+ public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store,
+ final KeyValueScanner memstoreScanner,
+ final InternalScanner s)
+ throws IOException {
+
+ if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+ return s;
+ }
+
+ //close last scanner object before creating a new one
+ if(s != null) {
+ s.close();
+ }
+
+ // Called during flushing the memstore to disk.
+ // Need to retain all the delete markers & all the versions
+ Scan scan = new Scan();
+ scan.setMaxVersions(Integer.MAX_VALUE);
+ ScanInfo oldScanInfo = store.getScanInfo();
+
+ Configuration conf = c.getEnvironment().getConfiguration();
+ //minor compactions and flushes both use "compact retain deletes"
+ ScanType scanType = ScanType.COMPACT_RETAIN_DELETES;
+ ScanInfo scanInfo =
+ ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo, store, scanType);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating the store scanner with :" + scanInfo + ", " +
+ "scan object:" + scan + " for table " + store.getTableName().getNameAsString() +
+ " and region " + store.getRegionInfo().getRegionNameAsString() +
+ " and cf " + store.getColumnFamilyName());
+ }
+ return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
+ scanType, store.getSmallestReadPoint(),
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public InternalScanner preCompactScannerOpen(
+ final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+ final InternalScanner s) throws IOException {
+
+ if (!ScanInfoUtil.isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
+ return s;
+ }
+ //close last scanner object before creating a new one
+ if(s != null) {
+ s.close();
+ }
+ Scan scan = new Scan();
+ scan.setMaxVersions(Integer.MAX_VALUE);
+ ScanInfo oldScanInfo = store.getScanInfo();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compaction triggering for table:" +
+ store.getRegionInfo().getTable().toString()
+ + " with scanType " + scanType + " for table " +
+ store.getTableName().getNameAsString() + " and region " +
+ store.getRegionInfo().getRegionNameAsString() +
+ " and cf " + store.getColumnFamilyName());
+ }
+
+ Configuration conf = c.getEnvironment().getConfiguration();
+ ScanInfo scanInfo =
+ ScanInfoUtil.getScanInfoForFlushesAndCompactions(conf, oldScanInfo,
+ store, scanType);
+ return new StoreScanner(store, scanInfo, scan, scanners, scanType,
+ store.getSmallestReadPoint(),
+ earliestPutTs);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b597c49..2ee4831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
@@ -1335,9 +1336,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// This will lead to failure of cross cluster RPC if the effective user is not
// the login user. Switch to the login user context to ensure we have the expected
// security context.
+
final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
// since we will make a call to syscat, do nothing if we are compacting syscat itself
- if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
+ // also, if max lookback age is already configured, we're already taken care of elsewhere
+ // unless the index stays disabled beyond the max lookback age, in which case you probably
+ // want to rebuild anyway
+ if (request.isMajor() &&
+ !ScanInfoUtil.isMaxLookbackTimeEnabled(compactionConfig) &&
+ !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 94b9c39..42acab8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -184,7 +184,12 @@ public enum SQLExceptionCode {
INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."),
UNEQUAL_SCN_AND_BUILD_INDEX_AT(534, "42911", "If both specified, values of CURRENT_SCN and BUILD_INDEX_AT must be equal."),
ONLY_INDEX_UPDATABLE_AT_SCN(535, "42912", "Only an index may be updated when the BUILD_INDEX_AT property is specified"),
- /**
+ CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL(537, "42914",
+ "Cannot use SCN to look further back in the past beyond the TTL"),
+ CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE(538, "42915",
+ "Cannot use SCN to look further back in the past beyond the configured max lookback age"),
+
+ /**
* HBase and Phoenix specific implementation defined sub-classes.
* Column family related exceptions.
*
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index fcd81df..bbb556e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -537,4 +537,5 @@ public class GlobalIndexChecker extends BaseRegionObserver {
public void stop(CoprocessorEnvironment e) throws IOException {
this.hTableFactory.shutdown();
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index dee02d1..6a4a8f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -51,7 +51,7 @@ public class TransactionUtil {
}
public static boolean isTransactionalTimestamp(long ts) {
- return ts >= MAX_NON_TX_TIMESTAMP;
+ return ts >= MAX_NON_TX_TIMESTAMP && ts != HConstants.LATEST_TIMESTAMP;
}
public static boolean isDelete(Cell cell) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c1901cb..9424379 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -48,14 +48,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -79,6 +85,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.ByteBasedLikeExpression;
@@ -125,6 +132,7 @@ import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -842,6 +850,13 @@ public class TestUtil {
conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps)));
}
+ public static void dumpTable(Connection conn, TableName tableName)
+ throws SQLException, IOException{
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableInterface table = cqs.getTable(tableName.getName());
+ dumpTable(table);
+ }
+
public static void dumpTable(HTableInterface table) throws IOException {
System.out.println("************ dumping " + table + " **************");
Scan s = new Scan();
@@ -854,7 +869,9 @@ public class TestUtil {
Cell current = null;
while (cellScanner.advance()) {
current = cellScanner.current();
- System.out.println(current);
+ System.out.println(current + "column= " +
+ Bytes.toString(CellUtil.cloneQualifier(current)) +
+ " val=" + Bytes.toString(CellUtil.cloneValue(current)));
}
}
}
@@ -884,6 +901,46 @@ public class TestUtil {
return rows;
}
+ public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
+ Scan s = new Scan();
+ s.setRaw(isRaw);;
+ s.setMaxVersions();
+
+ CellCount cellCount = new CellCount();
+ try (ResultScanner scanner = table.getScanner(s)) {
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ CellScanner cellScanner = result.cellScanner();
+ Cell current = null;
+ while (cellScanner.advance()) {
+ current = cellScanner.current();
+ cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current)));
+ }
+ }
+ }
+ return cellCount;
+ }
+
+ static class CellCount {
+ private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();
+
+ void addCell(String key){
+ if (rowCountMap.containsKey(key)){
+ rowCountMap.put(key, rowCountMap.get(key) +1);
+ } else {
+ rowCountMap.put(key, 1);
+ }
+ }
+
+ int getCellCount(String key){
+ if (rowCountMap.containsKey(key)){
+ return rowCountMap.get(key);
+ } else {
+ return 0;
+ }
+ }
+ }
+
public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
System.out.println("************ dumping index status for " + indexName + " **************");
@@ -1131,4 +1188,81 @@ public class TestUtil {
}
return false;
}
+
+
+ public static void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
+ assertEquals(code.getErrorCode(), se.getErrorCode());
+ assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
+ assertEquals(code.getSQLState(), se.getSQLState());
+ }
+
+ public static void assertTableHasTtl(Connection conn, TableName tableName, int ttl)
+ throws SQLException, IOException {
+ HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+ Assert.assertEquals(ttl, cd.getTimeToLive());
+ }
+
+ public static void assertTableHasVersions(Connection conn, TableName tableName, int versions)
+ throws SQLException, IOException {
+ HColumnDescriptor cd = getColumnDescriptor(conn, tableName);
+ Assert.assertEquals(versions, cd.getMaxVersions());
+ }
+
+ public static HColumnDescriptor getColumnDescriptor(Connection conn, TableName tableName)
+ throws SQLException, IOException {
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ HTableDescriptor td = admin.getTableDescriptor(tableName);
+ return td.getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+ }
+
+ public static void assertRawRowCount(Connection conn, TableName table, int expectedRowCount)
+ throws SQLException, IOException {
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ int count = TestUtil.getRawRowCount(cqs.getTable(table.getName()));
+ assertEquals(expectedRowCount, count);
+ }
+
+ public static void assertRawCellCount(Connection conn, TableName tableName,
+ byte[] row, int expectedCellCount)
+ throws SQLException, IOException{
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Table table = cqs.getTable(tableName.getName());
+ CellCount cellCount = getCellCount(table, true);
+ int count = cellCount.getCellCount(Bytes.toString(row));
+ assertEquals(expectedCellCount, count);
+ }
+
+ public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist)
+ throws SQLException {
+ boolean rowExists = false;
+ Properties props = new Properties();
+ ResultSet rs;
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ try (Connection conn = DriverManager.getConnection(url, props)){
+ rs = conn.createStatement().executeQuery(sql);
+ rowExists = rs.next();
+ if (shouldExist){
+ Assert.assertTrue("Row was not found at time " + scn +
+ " when it should have been",
+ rowExists);
+ } else {
+ Assert.assertFalse("Row was found at time " + scn +
+ " when it should not have been", rowExists);
+ }
+ }
+
+ }
+
+ public static void assertRowHasExpectedValueAtSCN(String url, String sql,
+ long scn, String value) throws SQLException {
+ Properties props = new Properties();
+ ResultSet rs;
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ try (Connection conn = DriverManager.getConnection(url, props)){
+ rs = conn.createStatement().executeQuery(sql);
+ Assert.assertTrue("Value " + value + " does not exist at scn " + scn, rs.next());
+ Assert.assertEquals(value, rs.getString(1));
+ }
+
+ }
}