You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2024/03/11 20:08:19 UTC
(phoenix) branch master updated: PHOENIX-7006: Configure maxLookbackAge at table level (#1751)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 57415bed06 PHOENIX-7006: Configure maxLookbackAge at table level (#1751)
57415bed06 is described below
commit 57415bed0660e9c2ee697f81f34a533d3536acc4
Author: sanjeet006py <36...@users.noreply.github.com>
AuthorDate: Tue Mar 12 01:38:13 2024 +0530
PHOENIX-7006: Configure maxLookbackAge at table level (#1751)
---
.../org/apache/phoenix/compile/QueryCompiler.java | 17 +-
.../phoenix/compile/ServerBuildIndexCompiler.java | 1 +
.../ServerBuildTransformingTableCompiler.java | 1 +
.../BaseScannerRegionObserverConstants.java | 2 +-
.../coprocessorclient/MetaDataProtocol.java | 2 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 1 +
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 13 +-
.../org/apache/phoenix/query/QueryConstants.java | 2 +
.../org/apache/phoenix/schema/DelegateTable.java | 6 +
.../org/apache/phoenix/schema/MetaDataClient.java | 71 +++++-
.../java/org/apache/phoenix/schema/PTable.java | 5 +
.../java/org/apache/phoenix/schema/PTableImpl.java | 28 ++-
.../org/apache/phoenix/schema/TableProperty.java | 24 ++
.../phoenix/schema/transform/TransformClient.java | 1 +
.../java/org/apache/phoenix/util/MetaDataUtil.java | 14 ++
.../java/org/apache/phoenix/util/ScanUtil.java | 15 ++
phoenix-core-client/src/main/protobuf/PTable.proto | 1 +
.../coprocessor/BaseScannerRegionObserver.java | 50 +++-
.../coprocessor/GlobalIndexRegionScanner.java | 4 +-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 119 +++++++++-
.../UngroupedAggregateRegionObserver.java | 4 +-
.../PhoenixServerBuildIndexInputFormat.java | 1 +
.../mapreduce/index/IndexScrutinyMapper.java | 3 +-
.../phoenix/mapreduce/index/IndexScrutinyTool.java | 31 +--
.../mapreduce/util/PhoenixConfigurationUtil.java | 14 ++
.../org/apache/phoenix/end2end/AlterTableIT.java | 106 ++++++++-
.../org/apache/phoenix/end2end/CreateTableIT.java | 78 +++++++
.../end2end/IndexRepairRegionScannerIT.java | 111 +++++----
.../phoenix/end2end/IndexScrutinyToolBaseIT.java | 7 +-
.../end2end/IndexScrutinyWithMaxLookbackIT.java | 120 +++++++---
.../end2end/IndexToolForNonTxGlobalIndexIT.java | 61 +++++
.../phoenix/end2end/MaxLookbackExtendedIT.java | 75 +++++-
.../org/apache/phoenix/end2end/MaxLookbackIT.java | 251 +++++++++++++++++++++
.../org/apache/phoenix/end2end/TableTTLIT.java | 34 ++-
.../it/java/org/apache/phoenix/end2end/ViewIT.java | 85 +++++++
.../org/apache/phoenix/end2end/ViewMetadataIT.java | 34 +++
.../phoenix/end2end/index/IndexMetadataIT.java | 24 ++
.../phoenix/end2end/transform/TransformToolIT.java | 144 +++++++++++-
.../java/org/apache/phoenix/query/BaseTest.java | 44 ++++
.../java/org/apache/phoenix/util/TestUtil.java | 4 +-
41 files changed, 1461 insertions(+), 150 deletions(-)
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 168b404379..dcd9198f7a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -87,6 +87,8 @@ import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
@@ -190,8 +192,19 @@ public class QueryCompiler {
if (scn == null) {
return;
}
- long maxLookBackAgeInMillis =
- BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conn.getQueryServices().getConfiguration());
+ List<TableRef> involvedTables = resolver.getTables();
+ Long maxLookBackAgeInMillis = null;
+ for(TableRef tableRef: involvedTables) {
+ PTable table = tableRef.getTable();
+ if (maxLookBackAgeInMillis == null) {
+ maxLookBackAgeInMillis = table.getMaxLookbackAge();
+ }
+ else if (table.getMaxLookbackAge() != null) {
+ maxLookBackAgeInMillis = Long.min(maxLookBackAgeInMillis, table.getMaxLookbackAge());
+ }
+ }
+ Configuration conf = conn.getQueryServices().getConfiguration();
+ maxLookBackAgeInMillis = MetaDataUtil.getMaxLookbackAge(conf, maxLookBackAgeInMillis);
long now = EnvironmentEdgeManager.currentTimeMillis();
if (maxLookBackAgeInMillis > 0 && now - maxLookBackAgeInMillis > scn){
throw new SQLExceptionInfo.Builder(
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 69699c6e48..bd8dcd480c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -136,6 +136,7 @@ public class ServerBuildIndexCompiler {
} else {
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setScanAttributeForMaxLookbackAge(scan, dataTable.getMaxLookbackAge());
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_REBUILD_PAGING, TRUE_BYTES);
// Serialize page row size only if we're overriding, else use server side value
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
index c1ed37b067..71f50beef6 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ServerBuildTransformingTableCompiler.java
@@ -73,6 +73,7 @@ public class ServerBuildTransformingTableCompiler extends ServerBuildIndexCompil
ScanUtil.annotateScanWithMetadataAttributes(dataTable, scan);
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setScanAttributeForMaxLookbackAge(scan, dataTable.getMaxLookbackAge());
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_REBUILD_PAGING, TRUE_BYTES);
// Serialize page row size only if we're overriding, else use server side value
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index f0fac9c278..9446d372a3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -136,6 +136,7 @@ public class BaseScannerRegionObserverConstants {
public static final String INDEX_ROW_KEY = "_IndexRowKey";
public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
+ public static final String MAX_LOOKBACK_AGE = "MAX_LOOKBACK_AGE";
/**
* The scan attribute to provide the scan start rowkey for analyze table queries.
*/
@@ -165,7 +166,6 @@ public class BaseScannerRegionObserverConstants {
*/
public static final String SCAN_SERVER_RETURN_VALID_ROW_KEY = "_ScanServerValidRowKey";
-
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
// In case of Index Write failure, we need to determine that Index mutation
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
index 60a21a35e5..d28f76ad81 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
@@ -100,7 +100,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 39;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 = MIN_TABLE_TIMESTAMP + 40;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c3596b4f11..5e76e47e53 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -359,6 +359,7 @@ public enum SQLExceptionCode {
+ "only if none of the child views extends primary key"),
VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
+ " only if none of the parents have indexes in the parent hierarchy"),
+ MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY(10957, "44A39", "Max lookback age can only be set for tables"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index a511c990b2..3216656fbf 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -75,6 +75,7 @@ import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -432,6 +433,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM";
public static final String SYSTEM_TRANSFORM_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE);
+ public static final String MAX_LOOKBACK_AGE = BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE;
+ public static final byte[] MAX_LOOKBACK_AGE_BYTES = Bytes.toBytes(MAX_LOOKBACK_AGE);
//SYSTEM:LOG
public static final String SYSTEM_LOG_TABLE = "LOG";
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 83e38dd0ab..00ea859f5b 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4127,29 +4127,32 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0) {
metaConnection =
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 5,
PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
PhoenixDatabaseMetaData.SCHEMA_VERSION + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
PhoenixDatabaseMetaData.INDEX_WHERE + " "
+ PVarchar.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+ PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE + " " + PLong.INSTANCE.getSqlTypeName());
UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
}
return metaConnection;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index a75cc7e117..32b2f4e26f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -175,6 +175,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
/**
*
@@ -372,6 +373,7 @@ public interface QueryConstants {
EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
STREAMING_TOPIC_NAME + " VARCHAR, \n" +
INDEX_WHERE + " VARCHAR, \n" +
+ MAX_LOOKBACK_AGE + " BIGINT, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 1a5d83e728..662206f419 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -424,6 +424,12 @@ public class DelegateTable implements PTable {
throws SQLException {
return delegate.getIndexWhereColumns(connection);
}
+
+ @Override
+ public Long getMaxLookbackAge() {
+ return delegate.getMaxLookbackAge();
+ }
+
@Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }
@Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ea738bfe55..0e78202c43 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -99,6 +99,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
import static org.apache.phoenix.query.QueryServices.DEFAULT_DISABLE_VIEW_SUBTREE_VALIDATION;
import static org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
@@ -155,6 +156,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.HashSet;
+import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Table;
@@ -353,9 +355,10 @@ public class MetaDataClient {
PHYSICAL_TABLE_NAME + "," +
SCHEMA_VERSION + "," +
STREAMING_TOPIC_NAME + "," +
- INDEX_WHERE +
+ INDEX_WHERE + "," +
+ MAX_LOOKBACK_AGE +
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
- "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
+ "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -2111,6 +2114,16 @@ public class MetaDataClient {
String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps);
String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);
+ Long maxLookbackAge = (Long) TableProperty.MAX_LOOKBACK_AGE.getValue(tableProps);
+
+ if (maxLookbackAge != null && tableType != TABLE) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+ MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY)
+ .setSchemaName(schemaName)
+ .setTableName(tableName)
+ .build()
+ .buildException();
+ }
if (parent != null && tableType == PTableType.INDEX) {
timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider);
@@ -3153,6 +3166,12 @@ public class MetaDataClient {
} else {
tableUpsert.setNull(36, Types.VARCHAR);
}
+ if (maxLookbackAge == null) {
+ tableUpsert.setNull(37, Types.BIGINT);
+ }
+ else {
+ tableUpsert.setLong(37, maxLookbackAge);
+ }
tableUpsert.execute();
if (asyncCreatedDate != null) {
@@ -3302,6 +3321,7 @@ public class MetaDataClient {
.setStreamingTopicName(streamingTopicName)
.setIndexWhere(statement.getWhereClause() == null ? null
: statement.getWhereClause().toString())
+ .setMaxLookbackAge(maxLookbackAge)
.build();
result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
addTableToCache(result);
@@ -3759,7 +3779,8 @@ public class MetaDataClient {
metaPropertiesEvaluated.getPhysicalTableName(),
metaPropertiesEvaluated.getSchemaVersion(),
metaPropertiesEvaluated.getColumnEncodedBytes(),
- metaPropertiesEvaluated.getStreamingTopicName());
+ metaPropertiesEvaluated.getStreamingTopicName(),
+ metaPropertiesEvaluated.getMaxLookbackAge());
}
private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional,
@@ -3767,7 +3788,7 @@ public class MetaDataClient {
String schemaVersion, QualifierEncodingScheme columnEncodedBytes) throws SQLException {
return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null,
updateCacheFrequency, null, null, null, null, -1L, null, null, null,phoenixTTL, false, physicalTableName,
- schemaVersion, columnEncodedBytes, null);
+ schemaVersion, columnEncodedBytes, null, null);
}
private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta,
@@ -3776,7 +3797,7 @@ public class MetaDataClient {
Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema,
ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization,
Long phoenixTTL, Boolean isChangeDetectionEnabled, String physicalTableName, String schemaVersion,
- QualifierEncodingScheme columnEncodedBytes, String streamingTopicName)
+ QualifierEncodingScheme columnEncodedBytes, String streamingTopicName, Long maxLookbackAge)
throws SQLException {
String schemaName = table.getSchemaName().getString();
String tableName = table.getTableName().getString();
@@ -3847,6 +3868,9 @@ public class MetaDataClient {
if (!Strings.isNullOrEmpty(streamingTopicName)) {
mutateStringProperty(connection, tenantId, schemaName, tableName, STREAMING_TOPIC_NAME, streamingTopicName);
}
+ if (maxLookbackAge != null) {
+ mutateLongProperty(connection, tenantId, schemaName, tableName, MAX_LOOKBACK_AGE, maxLookbackAge);
+ }
return seqNum;
}
@@ -5356,6 +5380,8 @@ public class MetaDataClient {
metaProperties.setSchemaVersion((String) value);
} else if (propName.equalsIgnoreCase(STREAMING_TOPIC_NAME)) {
metaProperties.setStreamingTopicName((String) value);
+ } else if (propName.equalsIgnoreCase(MAX_LOOKBACK_AGE)) {
+ metaProperties.setMaxLookbackAge((Long) value);
}
}
// if removeTableProps is true only add the property if it is not an HTable or Phoenix Table property
@@ -5564,6 +5590,21 @@ public class MetaDataClient {
}
}
+ if (metaProperties.getMaxLookbackAge() != null) {
+ if (table.getType() != TABLE) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+ MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY)
+ .setSchemaName(schemaName)
+ .setTableName(tableName)
+ .build()
+ .buildException();
+ }
+ if (! Objects.equals(metaProperties.getMaxLookbackAge(), table.getMaxLookbackAge())) {
+ metaPropertiesEvaluated.setMaxLookbackAge(metaProperties.getMaxLookbackAge());
+ changingPhoenixTableProperty = true;
+ }
+ }
+
return changingPhoenixTableProperty;
}
@@ -5588,6 +5629,8 @@ public class MetaDataClient {
private String schemaVersion = null;
private String streamingTopicName = null;
+ private Long maxLookbackAge = null;
+
public Boolean getImmutableRowsProp() {
return isImmutableRowsProp;
}
@@ -5735,6 +5778,14 @@ public class MetaDataClient {
public void setStreamingTopicName(String streamingTopicName) {
this.streamingTopicName = streamingTopicName;
}
+
+ public Long getMaxLookbackAge() {
+ return maxLookbackAge;
+ }
+
+ public void setMaxLookbackAge(Long maxLookbackAge) {
+ this.maxLookbackAge = maxLookbackAge;
+ }
}
private static class MetaPropertiesEvaluated {
@@ -5756,6 +5807,8 @@ public class MetaDataClient {
private String schemaVersion = null;
private String streamingTopicName = null;
+ private Long maxLookbackAge = null;
+
public Boolean getIsImmutableRows() {
return isImmutableRows;
}
@@ -5884,6 +5937,14 @@ public class MetaDataClient {
public void setStreamingTopicName(String streamingTopicName) {
this.streamingTopicName = streamingTopicName;
}
+
+ public Long getMaxLookbackAge() {
+ return maxLookbackAge;
+ }
+
+ public void setMaxLookbackAge(Long maxLookbackAge) {
+ this.maxLookbackAge = maxLookbackAge;
+ }
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 6fd89ad4bb..9407c5f86c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -997,6 +997,11 @@ public interface PTable extends PMetaDataEntity {
* @throws SQLException
*/
Set<ColumnReference> getIndexWhereColumns(PhoenixConnection connection) throws SQLException;
+
+ /**
+ * Returns: Table level max lookback age if configured else null.
+ */
+ Long getMaxLookbackAge();
/**
* Class to help track encoded column qualifier counters per column family.
*/
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index b5c2eee2e7..128bca0d7f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -30,6 +30,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
@@ -218,6 +219,7 @@ public class PTableImpl implements PTable {
private String indexWhere;
private Expression indexWhereExpression;
private Set<ColumnReference> indexWhereColumns;
+ private Long maxLookbackAge;
public static class Builder {
private PTableKey key;
@@ -284,6 +286,7 @@ public class PTableImpl implements PTable {
private String externalSchemaId;
private String streamingTopicName;
private String indexWhere;
+ private Long maxLookbackAge;
// Used to denote which properties a view has explicitly modified
private BitSet viewModifiedPropSet = new BitSet(3);
@@ -711,6 +714,14 @@ public class PTableImpl implements PTable {
return this;
}
+ public Builder setMaxLookbackAge(Long maxLookbackAge) {
+ if (maxLookbackAge != null) {
+ propertyValues.put(MAX_LOOKBACK_AGE, String.valueOf(maxLookbackAge));
+ }
+ this.maxLookbackAge = maxLookbackAge;
+ return this;
+ }
+
/**
* Populate derivable attributes of the PTable
* @return PTableImpl.Builder object
@@ -1002,6 +1013,7 @@ public class PTableImpl implements PTable {
this.externalSchemaId = builder.externalSchemaId;
this.streamingTopicName = builder.streamingTopicName;
this.indexWhere = builder.indexWhere;
+ this.maxLookbackAge = builder.maxLookbackAge;
}
// When cloning table, ignore the salt column as it will be added back in the constructor
@@ -1082,7 +1094,8 @@ public class PTableImpl implements PTable {
.setSchemaVersion(table.getSchemaVersion())
.setExternalSchemaId(table.getExternalSchemaId())
.setStreamingTopicName(table.getStreamingTopicName())
- .setIndexWhere(table.getIndexWhere());
+ .setIndexWhere(table.getIndexWhere())
+ .setMaxLookbackAge(table.getMaxLookbackAge());
}
@Override
@@ -2028,6 +2041,10 @@ public class PTableImpl implements PTable {
indexWhere =
(String) PVarchar.INSTANCE.toObject(table.getIndexWhere().toByteArray());
}
+ Long maxLookbackAge = null;
+ if (table.hasMaxLookbackAge()) {
+ maxLookbackAge = table.getMaxLookbackAge();
+ }
try {
return new PTableImpl.Builder()
.setType(tableType)
@@ -2086,6 +2103,7 @@ public class PTableImpl implements PTable {
.setExternalSchemaId(externalSchemaId)
.setStreamingTopicName(streamingTopicName)
.setIndexWhere(indexWhere)
+ .setMaxLookbackAge(maxLookbackAge)
.build();
} catch (SQLException e) {
throw new RuntimeException(e); // Impossible
@@ -2229,6 +2247,9 @@ public class PTableImpl implements PTable {
builder.setIndexWhere(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(
table.getIndexWhere())));
}
+ if (table.getMaxLookbackAge() != null) {
+ builder.setMaxLookbackAge(table.getMaxLookbackAge());
+ }
return builder.build();
}
@@ -2376,6 +2397,11 @@ public class PTableImpl implements PTable {
return indexWhere;
}
+ @Override
+ public Long getMaxLookbackAge() {
+ return maxLookbackAge;
+ }
+
private void buildIndexWhereExpression(PhoenixConnection connection) throws SQLException {
PhoenixPreparedStatement
pstmt =
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 24d0432543..f3ffea9221 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.schema.types.PLong;
public enum TableProperty {
@@ -336,6 +337,29 @@ public enum TableProperty {
@Override public Object getPTableValue(PTable table) {
return table.getStreamingTopicName();
}
+ },
+
+ MAX_LOOKBACK_AGE(PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE, true, false, false) {
+ @Override
+ public Object getValue(Object value) {
+ if (value == null) {
+ return null;
+ }
+ else if (value instanceof Integer) {
+ return Long.valueOf((Integer) value);
+ }
+ else if (value instanceof Long) {
+ return value;
+ }
+ else {
+ throw new IllegalArgumentException("Table level MAX_LOOKBACK_AGE should be a " + PLong.INSTANCE.getSqlTypeName() + " value in milli-seconds");
+ }
+ }
+
+ @Override
+ public Object getPTableValue(PTable table) {
+ return table.getMaxLookbackAge();
+ }
};
private final String propertyName;
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
index 760ff5923d..6649882a0c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
@@ -272,6 +272,7 @@ public class TransformClient {
.setSchemaVersion(table.getSchemaVersion())
.setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
.setStreamingTopicName(table.getStreamingTopicName())
+ .setMaxLookbackAge(table.getMaxLookbackAge())
// Transformables
.setImmutableStorageScheme(
(changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
index 96a40ac9c4..2fa3ac109f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/MetaDataUtil.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
@@ -82,6 +83,7 @@ import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.phoenix.thirdparty.com.google.common.collect.Iterables;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
public class MetaDataUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(MetaDataUtil.class);
@@ -1182,4 +1184,16 @@ public class MetaDataUtil {
connection.setAutoCommit(isAutoCommit);
}
}
+
+ /**
+ * @param conf Cluster configuration
+ * @param maxLookbackAge Input max lookback age
+ * @return Input max lookback age, if not null. If null, fallback to cluster level
+ * max lookback age. Will always return non-null long value.
+ */
+ public static long getMaxLookbackAge(Configuration conf, Long maxLookbackAge) {
+ Preconditions.checkNotNull(conf);
+ return maxLookbackAge != null ? maxLookbackAge :
+ BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
+ }
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 0718066d9f..dd476653b2 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -108,6 +108,7 @@ import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.collect.Iterators;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
/**
*
@@ -1630,6 +1631,20 @@ public class ScanUtil {
return null;
}
+ public static void setScanAttributeForMaxLookbackAge(Scan scan, Long maxLookbackAge) {
+ Preconditions.checkNotNull(scan);
+ if (maxLookbackAge != null) {
+ scan.setAttribute(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE,
+ Bytes.toBytes(maxLookbackAge));
+ }
+ }
+
+ public static Long getMaxLookbackAgeFromScanAttribute(Scan scan) {
+ Preconditions.checkNotNull(scan);
+ byte[] maxLookbackAge = scan.getAttribute(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE);
+ return maxLookbackAge != null ? Bytes.toLong(maxLookbackAge) : null;
+ }
+
/**
* Verify whether the given row key is in the scan boundaries i.e. scan start and end keys.
*
diff --git a/phoenix-core-client/src/main/protobuf/PTable.proto b/phoenix-core-client/src/main/protobuf/PTable.proto
index 88617e3636..5159bafaed 100644
--- a/phoenix-core-client/src/main/protobuf/PTable.proto
+++ b/phoenix-core-client/src/main/protobuf/PTable.proto
@@ -118,6 +118,7 @@ message PTable {
optional PTable transformingNewTable=51;
optional bytes streamingTopicName=52;
optional bytes indexWhere=53;
+ optional int64 maxLookbackAge = 54;
}
message EncodedCQCounter {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index ab52dd69d4..72f92e8b95 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.coprocessor;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,11 +57,16 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
import org.apache.phoenix.iterate.RegionScannerFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.schema.PTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -366,9 +373,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
setScanOptionsForFlushesAndCompactions(options);
return;
}
- if (isMaxLookbackTimeEnabled(conf)) {
+ long maxLookbackAge = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
- scanType);
+ scanType, maxLookbackAge);
}
}
@@ -380,9 +388,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
setScanOptionsForFlushesAndCompactions(options);
return;
}
- if (isMaxLookbackTimeEnabled(conf)) {
+ long maxLookbackAge = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
- ScanType.COMPACT_RETAIN_DELETES);
+ ScanType.COMPACT_RETAIN_DELETES, maxLookbackAge);
}
}
@@ -395,7 +404,8 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
setScanOptionsForFlushesAndCompactions(options);
return;
}
- if (isMaxLookbackTimeEnabled(conf)) {
+ long maxLookbackAge = getMaxLookbackAge(c);
+ if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
MemoryCompactionPolicy inMemPolicy =
store.getColumnFamilyDescriptor().getInMemoryCompaction();
ScanType scanType;
@@ -408,7 +418,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
scanType = ScanType.COMPACT_RETAIN_DELETES;
}
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
- scanType);
+ scanType, maxLookbackAge);
}
}
@@ -486,10 +496,9 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
*/
public long getTimeToLiveForCompactions(Configuration conf,
ColumnFamilyDescriptor columnDescriptor,
- ScanOptions options) {
+ ScanOptions options, long maxLookbackTtl) {
long ttlConfigured = columnDescriptor.getTimeToLive();
long ttlInMillis = ttlConfigured * 1000;
- long maxLookbackTtl = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
if (ttlConfigured == HConstants.FOREVER
&& columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
@@ -509,10 +518,10 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public void setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(Configuration conf,
ScanOptions options,
final Store store,
- ScanType type) {
+ ScanType type, long maxLookbackAge) {
ColumnFamilyDescriptor cfDescriptor = store.getColumnFamilyDescriptor();
options.setTTL(getTimeToLiveForCompactions(conf, cfDescriptor,
- options));
+ options, maxLookbackAge));
options.setKeepDeletedCells(getKeepDeletedCells(options, type));
options.setMaxVersions(Integer.MAX_VALUE);
options.setMinVersions(getMinVersions(options, cfDescriptor));
@@ -527,6 +536,27 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
return maxLookbackTime > 0L;
}
+ private static long getMaxLookbackAge(ObserverContext<RegionCoprocessorEnvironment> c) {
+ TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable();
+ String fullTableName = tableName.getNameAsString();
+ Configuration conf = c.getEnvironment().getConfiguration();
+ PTable table;
+ try(PhoenixConnection conn = QueryUtil.getConnectionOnServer(
+ conf).unwrap(PhoenixConnection.class)) {
+ table = conn.getTableNoCache(fullTableName);
+ }
+ catch (SQLException e) {
+ if (e instanceof TableNotFoundException) {
+ LOGGER.debug("Ignoring HBase table that is not a Phoenix table: {}", fullTableName);
+ // non-Phoenix HBase tables won't be found, do nothing
+ } else {
+ LOGGER.error("Unable to fetch table level max lookback age for {}", fullTableName, e);
+ }
+ return MetaDataUtil.getMaxLookbackAge(conf, null);
+ }
+ return MetaDataUtil.getMaxLookbackAge(conf, table.getMaxLookbackAge());
+ }
+
public static boolean isPhoenixTableTTLEnabled(Configuration conf) {
return conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 5af9950aa6..cf39407334 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -74,6 +74,8 @@ import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,7 +239,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
}
// Create the following objects only for rebuilds by IndexTool
hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
- maxLookBackInMills = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(config);
+ maxLookBackInMills = MetaDataUtil.getMaxLookbackAge(config, ScanUtil.getMaxLookbackAgeFromScanAttribute(scan));
rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4c60dccb26..bdba476a90 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -57,6 +57,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
@@ -83,6 +84,9 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_LOOKBACK_AGE_BYTES;
+import static org.apache.phoenix.schema.PTable.LinkType.PHYSICAL_TABLE;
+import static org.apache.phoenix.schema.PTable.LinkType.VIEW_INDEX_PARENT_TABLE;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
@@ -109,6 +113,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -130,6 +135,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
@@ -366,6 +372,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
private static final Cell INDEX_WHERE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, INDEX_WHERE_BYTES);
+ private static final Cell MAX_LOOKBACK_AGE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MAX_LOOKBACK_AGE_BYTES);
+
private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
EMPTY_KEYVALUE_KV,
TABLE_TYPE_KV,
@@ -405,7 +413,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
SCHEMA_VERSION_KV,
EXTERNAL_SCHEMA_ID_KV,
STREAMING_TOPIC_NAME_KV,
- INDEX_WHERE_KV
+ INDEX_WHERE_KV,
+ MAX_LOOKBACK_AGE_KV
);
static {
@@ -455,6 +464,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
private static final int INDEX_WHERE_INDEX =
TABLE_KV_COLUMNS.indexOf(INDEX_WHERE_KV);
+
+ private static final int MAX_LOOKBACK_AGE_INDEX = TABLE_KV_COLUMNS.indexOf(MAX_LOOKBACK_AGE_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
@@ -1430,6 +1441,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
: null;
builder.setIndexWhere(indexWhere != null ? indexWhere
: oldTable != null ? oldTable.getIndexWhere() : null);
+
+ Cell maxLookbackAgeKv = tableKeyValues[MAX_LOOKBACK_AGE_INDEX];
+ Long maxLookbackAge = maxLookbackAgeKv == null ? null :
+ PLong.INSTANCE.getCodec().decodeLong(maxLookbackAgeKv.getValueArray(),
+ maxLookbackAgeKv.getValueOffset(), SortOrder.getDefault());
+ if (tableType == PTableType.VIEW) {
+ byte[] viewKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+ schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+ maxLookbackAge = scanMaxLookbackAgeFromParent(viewKey, clientTimeStamp);
+ }
+
// Check the cell tag to see whether the view has modified this property
final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
HConstants.EMPTY_BYTE_ARRAY :
@@ -1463,6 +1485,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
PTable transformingNewTable = null;
boolean isRegularView = (tableType == PTableType.VIEW && viewType != ViewType.MAPPED);
+ boolean isViewIndex = false;
for (List<Cell> columnCellList : allColumnCellList) {
Cell colKv = columnCellList.get(LINK_TYPE_INDEX);
@@ -1546,6 +1569,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
}
+ else if (linkType == VIEW_INDEX_PARENT_TABLE) {
+ isViewIndex = true;
+ byte[] indexKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+ schemaName == null ? null : schemaName.getBytes(), tableNameBytes);
+ maxLookbackAge = scanMaxLookbackAgeFromParent(indexKey, clientTimeStamp);
+ }
} else {
long columnTimestamp =
columnCellList.get(0).getTimestamp() != HConstants.LATEST_TIMESTAMP ?
@@ -1557,6 +1586,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
isSalted, baseColumnCount, isRegularView, columnTimestamp);
}
}
+ if (tableType == INDEX && ! isViewIndex) {
+ byte[] tableKey = SchemaUtil.getTableKey(tenantId == null ? null : tenantId.getBytes(),
+ parentSchemaName == null ? null : parentSchemaName.getBytes(), parentTableName.getBytes());
+ maxLookbackAge = scanMaxLookbackAgeFromParent(tableKey, clientTimeStamp);
+ }
+ builder.setMaxLookbackAge(maxLookbackAge != null ? maxLookbackAge :
+ (oldTable != null ? oldTable.getMaxLookbackAge() : null));
builder.setEncodedCQCounter(cqCounter);
builder.setIndexes(indexes != null ? indexes : oldTable != null
@@ -1587,6 +1623,57 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
return builder.build();
}
+ private Long scanMaxLookbackAgeFromParent(byte[] key, long clientTimeStamp) throws IOException {
+ Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
+ try(Table sysCat = ServerUtil.getHTableForCoprocessorScan(this.env,
+ SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
+ ResultScanner scanner = sysCat.getScanner(scan)) {
+ Result result = scanner.next();
+ boolean startCheckingForLink = false;
+ byte[] parentTableKey = null;
+ do {
+ if (result == null) {
+ return null;
+ }
+ else if (startCheckingForLink) {
+ byte[] linkTypeBytes = result.getValue(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES);
+ if (linkTypeBytes != null) {
+ LinkType linkType = LinkType.fromSerializedValue(linkTypeBytes[0]);
+ int rowKeyColMetadataLength = 5;
+ byte[][] rowKeyMetaData = new byte[rowKeyColMetadataLength][];
+ getVarChars(result.getRow(), rowKeyColMetadataLength, rowKeyMetaData);
+ if (linkType == VIEW_INDEX_PARENT_TABLE) {
+ parentTableKey = getParentTableKeyFromChildRowKeyMetaData(rowKeyMetaData);
+ return scanMaxLookbackAgeFromParent(parentTableKey, clientTimeStamp);
+ }
+ else if (linkType == PHYSICAL_TABLE) {
+ parentTableKey = getParentTableKeyFromChildRowKeyMetaData(rowKeyMetaData);
+ }
+ }
+ }
+ else {
+ byte[] maxLookbackAgeInBytes = result.getValue(TABLE_FAMILY_BYTES, MAX_LOOKBACK_AGE_BYTES);
+ if (maxLookbackAgeInBytes != null) {
+ return PLong.INSTANCE.getCodec().decodeLong(maxLookbackAgeInBytes, 0, SortOrder.getDefault());
+ }
+ }
+ result = scanner.next();
+ startCheckingForLink = true;
+ } while (result != null);
+ return parentTableKey == null ? null : scanMaxLookbackAgeFromParent(parentTableKey, clientTimeStamp);
+ }
+ }
+
+ private byte[] getParentTableKeyFromChildRowKeyMetaData(byte[][] rowKeyMetaData) {
+ byte[] parentTenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
+ String parentSchema = SchemaUtil.getSchemaNameFromFullName(
+ rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
+ byte[] parentSchemaName = parentSchema != null ? parentSchema.getBytes(StandardCharsets.UTF_8) : null;
+ byte[] parentTableName = SchemaUtil.getTableNameFromFullName(
+ rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]).getBytes(StandardCharsets.UTF_8);
+ return SchemaUtil.getTableKey(parentTenantId, parentSchemaName, parentTableName);
+ }
+
private Long getViewIndexId(Cell[] tableKeyValues, PDataType viewIndexIdType) {
Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
return viewIndexIdKv == null ? null :
@@ -3389,8 +3476,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
&& result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
} else {
+ PTable oldTable = table;
table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP,
clientVersion);
+ if (table != null && hasInheritableTablePropertyChanged(table, oldTable)) {
+ invalidateAllChildTablesAndIndexes(table, childViews);
+ }
if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type == PTableType.VIEW) {
try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
@@ -3422,6 +3513,32 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
}
+ private boolean hasInheritableTablePropertyChanged(PTable newTable, PTable oldTable) {
+ return ! Objects.equals(newTable.getMaxLookbackAge(), oldTable.getMaxLookbackAge());
+ }
+
+ private void invalidateAllChildTablesAndIndexes(PTable table, List<PTable> childViews) {
+ List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>();
+ if (table.getIndexes() != null) {
+ for(PTable index: table.getIndexes()) {
+ invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(index)));
+ }
+ }
+ for(PTable childView: childViews) {
+ invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(childView)));
+ if (childView.getIndexes() != null) {
+ for(PTable viewIndex: childView.getIndexes()) {
+ invalidateList.add(new ImmutableBytesPtr(SchemaUtil.getTableKey(viewIndex)));
+ }
+ }
+ }
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
+ GlobalCache.getInstance(this.env).getMetaDataCache();
+ for(ImmutableBytesPtr invalidateKey: invalidateList) {
+ metaDataCache.invalidate(invalidateKey);
+ }
+ }
+
/**
* Removes the table from the server side cache
*/
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 1eff655e41..c07e1e25c8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -118,6 +118,7 @@ import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
+import org.apache.phoenix.util.MetaDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -640,7 +641,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (table != null && !isDisabled && isPhoenixTableTTLEnabled) {
internalScanner =
new CompactionScanner(c.getEnvironment(), store, scanner,
- BaseScannerRegionObserverConstants.getMaxLookbackInMillis(c.getEnvironment().getConfiguration()),
+ MetaDataUtil.getMaxLookbackAge(
+ c.getEnvironment().getConfiguration(), table.getMaxLookbackAge()),
SchemaUtil.getEmptyColumnFamily(table),
table.getEncodingScheme()
== PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 4c5ea62c7b..2492394494 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -131,6 +131,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserverConstants.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setScanAttributeForMaxLookbackAge(scan, pIndexTable.getMaxLookbackAge());
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
}
return plan;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 30db799e23..365ff9c59f 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -170,7 +170,8 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
LOGGER.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
ttl = getTableTtl();
- maxLookbackAgeMillis = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+ maxLookbackAgeMillis = MetaDataUtil.getMaxLookbackAge(configuration,
+ PhoenixConfigurationUtil.getMaxLookbackAge(configuration));
} catch (SQLException | NoSuchAlgorithmException e) {
tryClosingResourceSilently(this.outputUpsertStmt);
tryClosingResourceSilently(this.connection);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index 2dbbc2f997..cfd0adf880 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +272,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
// set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts));
+ PhoenixConfigurationUtil.setMaxLookbackAge(configuration, pdataTable.getMaxLookbackAge());
// set the source table to either data or index table
SourceTargetColumnNames columnNames =
@@ -420,8 +422,6 @@ public class IndexScrutinyTool extends Configured implements Tool {
? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
: EnvironmentEdgeManager.currentTimeMillis() - 60000;
- validateTimestamp(configuration, ts);
-
if (indexTable != null) {
if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
throw new IllegalArgumentException(String
@@ -429,6 +429,9 @@ public class IndexScrutinyTool extends Configured implements Tool {
}
}
+ PTable pDataTable = connection.unwrap(PhoenixConnection.class).getTable(qDataTable);
+ validateTimestamp(configuration, ts, pDataTable.getMaxLookbackAge());
+
String outputFormatOption = cmdLine.getOptionValue(OUTPUT_FORMAT_OPTION.getOpt());
OutputFormat outputFormat =
outputFormatOption != null
@@ -445,13 +448,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
Configuration outputConfiguration = HBaseConfiguration.create(configuration);
outputConfiguration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
try (Connection outputConn = ConnectionUtil.getOutputConnection(outputConfiguration)) {
- outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
- outputConn.createStatement().
- execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
- outputConn.createStatement()
- .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
- outputConn.createStatement().
- execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
+ createScrutinyToolTables(outputConn);
}
}
@@ -517,16 +514,15 @@ public class IndexScrutinyTool extends Configured implements Tool {
}
}
- private void validateTimestamp(Configuration configuration, long ts) {
- long maxLookBackAge = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+ private void validateTimestamp(Configuration configuration, long ts, Long dataTableMaxLookback) {
+ long maxLookBackAge = MetaDataUtil.getMaxLookbackAge(configuration, dataTableMaxLookback);
if (maxLookBackAge != BaseScannerRegionObserverConstants.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000L) {
long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
if (ts < minTimestamp){
throw new IllegalArgumentException("Index scrutiny can't look back past the configured" +
- "max lookback age: " + maxLookBackAge / 1000 + " seconds");
+ " max lookback age: " + maxLookBackAge / 1000 + " seconds");
}
}
-
}
@VisibleForTesting
@@ -539,4 +535,13 @@ public class IndexScrutinyTool extends Configured implements Tool {
System.exit(result);
}
+ public static void createScrutinyToolTables(Connection conn) throws Exception {
+ conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+ conn.createStatement().
+ execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
+ conn.createStatement()
+ .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+ conn.createStatement().
+ execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
+ }
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index eb5d2f51a7..8cfa5382db 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
@@ -920,4 +921,17 @@ public final class PhoenixConfigurationUtil {
return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
}
+
+ public static void setMaxLookbackAge(Configuration configuration, Long maxLookbackAge) {
+ Preconditions.checkNotNull(configuration);
+ if (maxLookbackAge != null) {
+ configuration.setLong(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE, maxLookbackAge);
+ }
+ }
+
+ public static Long getMaxLookbackAge(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ String maxLookbackAgeStr = configuration.get(BaseScannerRegionObserverConstants.MAX_LOOKBACK_AGE);
+ return maxLookbackAgeStr != null ? Long.valueOf(maxLookbackAgeStr) : null;
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 9f6be13c1b..81dd83ed1a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
import java.io.IOException;
import java.sql.Connection;
@@ -48,6 +49,7 @@ import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import java.time.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@@ -74,7 +76,6 @@ import org.apache.phoenix.schema.export.DefaultSchemaWriter;
import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -111,6 +112,8 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
private final boolean columnEncoded;
private static final Logger LOGGER = LoggerFactory.getLogger(AlterTableIT.class);
+ private final long oneDayInMillis = Duration.ofDays(1).toMillis();
+
public AlterTableIT(boolean columnEncoded) {
this.columnEncoded = columnEncoded;
this.tableDDLOptions = columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0";
@@ -1796,4 +1799,103 @@ public class AlterTableIT extends ParallelStatsDisabledIT {
newLastDDLTimestamp > oldLastDDLTimestamp);
}
}
-}
\ No newline at end of file
+
+ @Test
+ public void testChangeTableLevelMaxLookbackAge() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Long maxLookbackAge = oneDayInMillis;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + maxLookbackAge;
+ conn.createStatement().execute(ddl);
+ assertMaxLookbackAge(fullTableName, maxLookbackAge);
+ maxLookbackAge = 3L * oneDayInMillis;
+ alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+ assertMaxLookbackAge(fullTableName, maxLookbackAge);
+ maxLookbackAge = 2L * oneDayInMillis;
+ alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+ assertMaxLookbackAge(fullTableName, maxLookbackAge);
+ maxLookbackAge = 0L;
+ alterTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+ assertMaxLookbackAge(fullTableName, maxLookbackAge);
+ }
+ }
+
+ @Test
+ public void testChangeTableLevelMaxLookbackAgeToInvalid() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + oneDayInMillis;
+ conn.createStatement().execute(ddl);
+ }
+ assertThrows(IllegalArgumentException.class, () -> alterTableLevelMaxLookbackAge(fullTableName, "2309.3"));
+ assertThrows(IllegalArgumentException.class, () -> alterTableLevelMaxLookbackAge(fullTableName, "forty"));
+ }
+
+ @Test
+ public void testMaxLookbackAgeOfChildViewsAndIndexesWithAlterTable() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Long maxLookbackAge = oneDayInMillis;
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ String ddl = "CREATE TABLE " + fullDataTableName +
+ " (a_string varchar not null PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE=" + maxLookbackAge;
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullDataTableName, maxLookbackAge);
+ String indexName = generateUniqueName();
+ String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ ddl = "CREATE INDEX " + indexName + " ON " + fullDataTableName + " (COL1)";
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullIndexName, maxLookbackAge);
+ String childViewName = generateUniqueName();
+ String fullChildViewName = SchemaUtil.getTableName(schemaName, childViewName);
+ ddl = "CREATE VIEW " + fullChildViewName + " AS SELECT * FROM " + fullDataTableName;
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullChildViewName, maxLookbackAge);
+ String childViewIndexName = generateUniqueName();
+ String fullChildViewIndexName = SchemaUtil.getTableName(schemaName, childViewIndexName);
+ ddl = "CREATE INDEX " + childViewIndexName + " ON " + fullChildViewName + " (COL1)";
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullChildViewIndexName, maxLookbackAge);
+ String grandChildViewName = generateUniqueName();
+ String fullGrandChildViewName = SchemaUtil.getTableName(schemaName, grandChildViewName);
+ ddl = "CREATE VIEW " + fullGrandChildViewName + " (col2 varchar) AS SELECT * FROM " + fullChildViewName;
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullGrandChildViewName, maxLookbackAge);
+ String grandChildViewIndexName = generateUniqueName();
+ String fullGrandChildViewIndexName = SchemaUtil.getTableName(schemaName, grandChildViewIndexName);
+ ddl = "CREATE INDEX " + grandChildViewIndexName + " ON " + fullGrandChildViewName + " (COL2)";
+ stmt.execute(ddl);
+ assertMaxLookbackAge(fullGrandChildViewIndexName, maxLookbackAge);
+ maxLookbackAge = 2 * oneDayInMillis;
+ alterTableLevelMaxLookbackAge(fullDataTableName, maxLookbackAge.toString());
+ assertMaxLookbackAge(fullDataTableName, maxLookbackAge);
+ assertMaxLookbackAge(fullIndexName, maxLookbackAge);
+ assertMaxLookbackAge(fullChildViewName, maxLookbackAge);
+ assertMaxLookbackAge(fullChildViewIndexName, maxLookbackAge);
+ assertMaxLookbackAge(fullGrandChildViewName, maxLookbackAge);
+ assertMaxLookbackAge(fullGrandChildViewIndexName, maxLookbackAge);
+ }
+ }
+
+ private void assertMaxLookbackAge(String fullTableName, Long expectedMaxLookbackAge) throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ assertEquals(expectedMaxLookbackAge, pconn.getTableNoCache(fullTableName).getMaxLookbackAge());
+ }
+ }
+
+ private void alterTableLevelMaxLookbackAge(String fullTableName, String maxLookbackAge) throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = "ALTER TABLE " + fullTableName + " SET MAX_LOOKBACK_AGE = " + maxLookbackAge;
+ conn.createStatement().execute(ddl);
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 62b15cb9b6..d7473e5baf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.mapreduce.index.IndexUpgradeTool.ROLLBACK_OP;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -26,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
import java.io.IOException;
import java.sql.Connection;
@@ -1280,6 +1282,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
.getColumnQualifierBytes()));
assertEquals(14, encodingScheme.decode(table.getColumnForColumnName("INT3")
.getColumnQualifierBytes()));
+ assertNull(table.getMaxLookbackAge());
}
@Test
@@ -1699,6 +1702,62 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testCreateTableWithTableLevelMaxLookbackAge() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Long maxLookbackAge = 259200L;
+ createTableWithTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+ schemaName = generateUniqueName();
+ dataTableName = generateUniqueName();
+ fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ maxLookbackAge = 25920000000L;
+ createTableWithTableLevelMaxLookbackAge(fullTableName, maxLookbackAge.toString());
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+ String indexTableName = generateUniqueName();
+ createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(SchemaUtil.getTableName(schemaName, indexTableName)));
+ }
+
+ @Test
+ public void testCreateTableWithTableLevelMaxLookbackAgeAsNull() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ createTableWithTableLevelMaxLookbackAge(fullTableName, "NULL");
+ assertNull(queryTableLevelMaxLookbackAge(fullTableName));
+ String indexTableName = generateUniqueName();
+ createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName);
+ assertNull(queryTableLevelMaxLookbackAge(SchemaUtil.getTableName(schemaName, indexTableName)));
+ }
+
+ @Test
+ public void testCreateTableWithInvalidTableLevelMaxLookbackAge() {
+ String errMsg = "Table level MAX_LOOKBACK_AGE should be a BIGINT value in milli-seconds";
+ IllegalArgumentException err = assertThrows(IllegalArgumentException.class,
+ () -> createTableWithTableLevelMaxLookbackAge(
+ SchemaUtil.getTableName(generateUniqueName(), generateUniqueName()), "2.3"));
+ assertEquals(errMsg, err.getMessage());
+ err = assertThrows(IllegalArgumentException.class, () -> createTableWithTableLevelMaxLookbackAge(
+ SchemaUtil.getTableName(generateUniqueName(), generateUniqueName()), "three"));
+ assertEquals(errMsg, err.getMessage());
+ }
+
+ @Test
+ public void testCreateIndexWithTableLevelMaxLookbackAge() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ createTableWithTableLevelMaxLookbackAge(fullTableName, "NULL");
+ String indexOptions = "MAX_LOOKBACK_AGE=300";
+ SQLException err = assertThrows(SQLException.class,
+ () -> createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName, indexOptions));
+ assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+ }
+
public static long verifyLastDDLTimestamp(String tableFullName, long startTS, Connection conn) throws SQLException {
long endTS = EnvironmentEdgeManager.currentTimeMillis();
//Now try the PTable API
@@ -1727,4 +1786,23 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
}
}
+ private void createTableWithTableLevelMaxLookbackAge(String fullTableName, String maxLookbackAge) throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE="+maxLookbackAge;
+ conn.createStatement().execute(createDdl);
+ }
+ }
+
+ private void createIndexOnTableWithMaxLookbackAge(String indexTableName, String fullTableName, String indexOptions) throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String createIndexDdl = "CREATE INDEX " + indexTableName + " ON " + fullTableName + " (COL1) " +
+ (indexOptions == null ? "" : indexOptions);
+ conn.createStatement().execute(createIndexDdl);
+ }
+ }
+
+ private void createIndexOnTableWithMaxLookbackAge(String indexTableName, String fullTableName) throws Exception {
+ createIndexOnTableWithMaxLookbackAge(indexTableName, fullTableName, null);
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
index 603e2c2cb0..1e13f3a123 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
@@ -22,8 +22,6 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -48,7 +46,6 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.ManualEnvironmentEdge;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
@@ -69,7 +66,6 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
-import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -77,14 +73,7 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertArrayEquals;
@@ -99,14 +88,14 @@ import static org.junit.Assert.fail;
public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScannerIT.class);
- private final String tableDDLOptions;
+ private String tableDDLOptions;
private final String indexDDLOptions;
private boolean mutable;
+ StringBuilder optionBuilder = new StringBuilder();
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
public IndexRepairRegionScannerIT(boolean mutable, boolean singleCellIndex) {
- StringBuilder optionBuilder = new StringBuilder();
StringBuilder indexOptionBuilder = new StringBuilder();
this.mutable = mutable;
if (!mutable) {
@@ -296,12 +285,6 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
}
}
- static private void resetIndexRegionObserverFailPoints() {
- IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
- IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
- IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
- }
-
static private void commitWithException(Connection conn) {
try {
conn.commit();
@@ -843,29 +826,73 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
}
}
- public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
- IOException, InterruptedException {
- Scan scan = new Scan();
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().
- getAdmin();
- org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
- Table table = hbaseConn.getTable(tableName);
- boolean deletedRows = false;
- try (ResultScanner scanner = table.getScanner(scan)) {
- for (Result r : scanner) {
- Delete del = new Delete(r.getRow());
- table.delete(del);
- deletedRows = true;
- }
- } catch (Exception e) {
- //if the table doesn't exist, we have no rows to delete. Easier to catch
- //than to pre-check for existence
+ @Test
+ public void testInvalidIndexRowWithTableLevelMaxLookback() throws Exception {
+ if (!mutable) {
+ return;
}
- //don't flush/compact if we didn't write anything, because we'll hang forever
- if (deletedRows) {
- getUtility().getAdmin().flush(tableName);
- TestUtil.majorCompact(getUtility(), tableName);
+ final int NROWS = 4;
+ final int maxLookbackAge = 12000;
+ if ((optionBuilder.length() > 0 && optionBuilder.toString().trim().startsWith("SPLIT ON"))
+ || optionBuilder.length() == 0) {
+ optionBuilder.insert(0, "MAX_LOOKBACK_AGE=" + maxLookbackAge + " ");
}
- }
+ else {
+ optionBuilder.insert(0, "MAX_LOOKBACK_AGE=" + maxLookbackAge + ", ");
+ }
+ tableDDLOptions = optionBuilder.toString();
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
+ }
+ conn.commit();
+
+ ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+ customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(customEdge);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(3, 100, 200)");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ customEdge.incrementValue(5);
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ assertEquals(2,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(2, 102, 202)");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ customEdge.incrementValue(maxLookbackAge + 1);
+ indexTool = IndexToolIT.runIndexTool(false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+ mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(2,
+ mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ }
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 7f652c5fef..6d028d287c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -66,11 +66,16 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
String[] cmdArgs) throws Exception {
+ return runScrutiny(mapperClass, cmdArgs, 0);
+ }
+
+ protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
+ String[] cmdArgs, int expectedStatus) throws Exception {
IndexScrutinyTool scrutiny = new IndexScrutinyTool(mapperClass);
Configuration conf = new Configuration(getUtility().getConfiguration());
scrutiny.setConf(conf);
int status = scrutiny.run(cmdArgs);
- assertEquals(0, status);
+ assertEquals(expectedStatus, status);
for (Job job : scrutiny.getJobs()) {
assertTrue(job.waitForCompletion(true));
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
index 114e6b68a7..71a2673fd5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.mapreduce.Counters;
@@ -26,7 +27,6 @@ import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.MetaDataUtil;
@@ -37,14 +37,21 @@ import org.apache.phoenix.util.TestUtil;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.After;
+import org.junit.experimental.categories.Category;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT;
@@ -55,6 +62,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
private static PreparedStatement upsertDataStmt;
@@ -67,13 +76,24 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
private static boolean isViewIndex;
private static ManualEnvironmentEdge testClock;
public static final String UPSERT_DATA = "UPSERT INTO %s VALUES (?, ?, ?)";
- public static final String DELETE_SCRUTINY_METADATA = "DELETE FROM "
- + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME;
- public static final String DELETE_SCRUTINY_OUTPUT = "DELETE FROM " +
- IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
- public static final int MAX_LOOKBACK = 6;
+ public static final int MAX_LOOKBACK = 12;
+ public static final int TABLE_LEVEL_MAX_LOOKBACK = 8;
private long scrutinyTs;
+ private final boolean hasTableLevelMaxLookback;
+ public IndexScrutinyWithMaxLookbackIT(boolean hasTableLevelMaxLookback) {
+ this.hasTableLevelMaxLookback = hasTableLevelMaxLookback;
+ }
+
+ @Parameterized.Parameters(name = "hasTableLevelMaxLookback={0}")
+ public static synchronized Collection<Object[]> data() {
+ List<Object[]> list = Lists.newArrayListWithExpectedSize(2);
+ boolean[] Booleans = new boolean[]{true, false};
+ for (boolean mutable : Booleans) {
+ list.add(new Object[]{mutable});
+ }
+ return list;
+ }
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -85,18 +105,27 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
}
@Before
- public void setupTest() throws SQLException {
+ public void setupTest() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl(),
- PropertiesUtil.deepCopy(TEST_PROPERTIES))){
- conn.createStatement().execute(DELETE_SCRUTINY_METADATA);
- conn.createStatement().execute(DELETE_SCRUTINY_OUTPUT);
- conn.commit();
- } catch (TableNotFoundException tnfe){
- //This will happen the first time
+ PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ IndexScrutinyTool.createScrutinyToolTables(conn);
+ }
+ }
+
+ @After
+ public void cleanup () throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ deleteAllRows(conn,
+ TableName.valueOf(IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME));
+ deleteAllRows(conn,
+ TableName.valueOf(IndexScrutinyTableOutput.OUTPUT_TABLE_NAME));
}
+ EnvironmentEdgeManager.reset();
}
@Test
+ @Ignore("Test is already broken")
public void testScrutinyOnRowsBeyondMaxLookBack() throws Exception {
setupTables();
try {
@@ -109,6 +138,7 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
}
@Test
+ @Ignore("Test is already broken")
public void testScrutinyOnRowsBeyondMaxLookback_viewIndex() throws Exception {
schema = "S"+generateUniqueName();
dataTableName = "T"+generateUniqueName();
@@ -160,11 +190,32 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
@Test
public void testScrutinyOnDeletedRowsBeyondMaxLookBack() throws Exception {
setupTables();
- try {
- upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
- assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
- } finally {
- EnvironmentEdgeManager.reset();
+ upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
+ assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+ }
+
+ @Test
+ public void testSCNBeyondMaxLookback() throws Exception {
+ setupTables();
+ testClock.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(testClock);
+ long beforeInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ testClock.incrementValue(1);
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ populateTable(dataTableFullName, conn);
+ testClock.incrementValue(1);
+ long maxLookbackAge = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK : MAX_LOOKBACK;
+ testClock.incrementValue(maxLookbackAge * 1000);
+ if (hasTableLevelMaxLookback) {
+ assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ beforeInsertSCN + MAX_LOOKBACK * 1000);
+ }
+ runScrutiny(schema, dataTableName, indexTableName, beforeInsertSCN, -1);
+ }
+ catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains(
+ "Index scrutiny can't look back past the configured max lookback age"));
}
}
@@ -177,6 +228,9 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
isViewIndex = false;
String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+ "ZIP INTEGER) COLUMN_ENCODED_BYTES=0, VERSIONS=1";
+ if (hasTableLevelMaxLookback) {
+ dataTableDDL += ", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK * 1000;
+ }
String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
testClock = new ManualEnvironmentEdge();
@@ -234,17 +288,23 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
throws Exception {
try(Connection conn =
DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
- populateTable(tableFullName, conn);
- long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
- testClock.setValue(afterInsertSCN);
+ testClock.setValue(EnvironmentEdgeManager.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(testClock);
+ long beforeInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ testClock.incrementValue(1);
+ populateTable(tableFullName, conn);
+ testClock.incrementValue(1);
deleteIndexRows(conn);
//move forward to the time we want to scrutinize, which is less than max lookback age
//for the initial inserts
- testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+ long maxLookbackAge = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK : MAX_LOOKBACK;
+ testClock.incrementValue(maxLookbackAge / 2 * 1000);
scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
//now go past max lookback age for the initial 2 inserts
- testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+ testClock.incrementValue(maxLookbackAge / 2 * 1000);
+ if (hasTableLevelMaxLookback) {
+ assertTrue(EnvironmentEdgeManager.currentTimeMillis() < beforeInsertSCN + MAX_LOOKBACK * 1000);
+ }
List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
Job job = completedJobs.get(0);
@@ -273,18 +333,24 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
Long scrutinyTs)
throws Exception {
- return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs);
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs, 0);
+ }
+
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long scrutinyTs, int expectedStatus) throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null,
+ scrutinyTs, expectedStatus);
}
private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
Long batchSize, IndexScrutinyTool.SourceTable sourceTable,
- Long scrutinyTs) throws Exception {
+ Long scrutinyTs, int expectedStatus) throws Exception {
final String[]
cmdArgs =
getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
true, IndexScrutinyTool.OutputFormat.TABLE,
null, null, scrutinyTs);
- return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs);
+ return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs, expectedStatus);
}
private static class MaxLookbackIndexScrutinyMapper extends IndexScrutinyMapper {
@@ -309,4 +375,4 @@ public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
}
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 47ad02419e..1620604c9f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
@@ -183,6 +184,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
//In HBase 2.2.6 and HBase 2.3.0+, helps region server assignments when under an injected
// edge clock. See HBASE-24511.
serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0));
+ // Need to set dispatcher delay to 0 to account for injected edge else queue might get stuck
+ serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", Integer.toString(0));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(4);
clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
@@ -1478,6 +1481,64 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest {
}
}
+ @Test
+ public void testMissingIndexRowsBeyondTableLevelMaxLookback() throws Exception {
+ if (! mutable) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ int maxLookbackAge = 12000;
+ int delta = 5;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + fullDataTableName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) MAX_LOOKBACK_AGE=" + maxLookbackAge);
+ ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+ customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(customEdge);
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ customEdge.incrementValue(delta);
+ // Insert a row
+ conn.createStatement()
+ .execute("upsert into " + fullDataTableName + " values (1, 'Phoenix', 'A')");
+ conn.commit();
+ customEdge.incrementValue(delta);
+ conn.createStatement()
+ .execute("upsert into " + fullDataTableName + " values (2, 'Phoenix', 'B')");
+ conn.commit();
+ customEdge.incrementValue(delta);
+ String dataTableSql = "SELECT * FROM " + fullDataTableName;
+ ResultSet rs = conn.createStatement().executeQuery(dataTableSql);
+ Assert.assertTrue(rs.next());
+ Assert.assertTrue(rs.next());
+ customEdge.incrementValue(delta);
+ conn.createStatement()
+ .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC " + this.indexDDLOptions,
+ indexTableName, fullDataTableName));
+ customEdge.incrementValue(delta);
+ IndexTool indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ assertEquals(2,
+ mrJobCounters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0, mrJobCounters.findCounter(
+ BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT.name()).getValue());
+ customEdge.incrementValue(maxLookbackAge + 1);
+ indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, dataTableName, indexTableName, null,
+ 0, IndexTool.IndexVerifyType.BEFORE);
+ mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ assertEquals(2, mrJobCounters.findCounter(
+ BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(2, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ startTime + MAX_LOOKBACK_AGE * 1000);
+ }
+ }
+
private Pair<Integer, Integer> countPutsAndDeletes(String tableName) throws Exception {
int numPuts = 0;
int numDeletes = 0;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index 7c31b9c95a..f95918159a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -65,6 +65,7 @@ import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
@RunWith(Parameterized.class)
public class MaxLookbackExtendedIT extends BaseTest {
private static final int MAX_LOOKBACK_AGE = 15;
+ private static final int TABLE_LEVEL_MAX_LOOKBACK_AGE = 10;
private static final int ROWS_POPULATED = 2;
public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
private String tableDDLOptions;
@@ -72,9 +73,11 @@ public class MaxLookbackExtendedIT extends BaseTest {
ManualEnvironmentEdge injectEdge;
private int ttl;
private boolean multiCF;
+ private boolean hasTableLevelMaxLookback;
- public MaxLookbackExtendedIT(boolean multiCF) {
+ public MaxLookbackExtendedIT(boolean multiCF, boolean hasTableLevelMaxLookback) {
this.multiCF = multiCF;
+ this.hasTableLevelMaxLookback = hasTableLevelMaxLookback;
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -103,21 +106,30 @@ public class MaxLookbackExtendedIT extends BaseTest {
Assert.assertFalse("refCount leaked", refCountLeaked);
}
- @Parameterized.Parameters(name = "MaxLookbackExtendedIT_multiCF={0}")
- public static Collection<Boolean> data() {
- return Arrays.asList(false, true);
+ @Parameterized.Parameters(name = "MaxLookbackExtendedIT_multiCF={0},hasTableLevelMaxLookback={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {false, true},
+ {false, false},
+ {true, false},
+ {true, true}
+ });
}
@Test
public void testKeepDeletedCellsWithMaxLookbackAge() throws Exception {
int versions = 2;
optionBuilder.append(", VERSIONS=" + versions);
optionBuilder.append(", KEEP_DELETED_CELLS=TRUE");
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ }
tableDDLOptions = optionBuilder.toString();
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
createTable(dataTableName);
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
+ long firstUpsertSCN = EnvironmentEdgeManager.currentTimeMillis();
conn.createStatement().execute("upsert into " + dataTableName
+ " values ('a', 'ab1', 'abc1', 'abcd1')");
conn.commit();
@@ -136,7 +148,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
String dml = "DELETE from " + dataTableName + " WHERE id = 'a'";
Assert.assertEquals(1, conn.createStatement().executeUpdate(dml));
conn.commit();
- injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+ injectEdge.incrementValue(timeToAdvance * 1000);
+ if (hasTableLevelMaxLookback) {
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() < firstUpsertSCN +
+ MAX_LOOKBACK_AGE * 1000);
+ }
dml = "DELETE from " + dataTableName + " WHERE id = 'b'";
Assert.assertEquals(1, conn.createStatement().executeUpdate(dml));
conn.commit();
@@ -191,6 +208,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
}
@Test
public void testTooLowSCNWithMaxLookbackAge() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ }
String dataTableName = generateUniqueName();
createTable(dataTableName);
injectEdge.setValue(System.currentTimeMillis());
@@ -199,7 +220,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
populateTable(dataTableName);
long populateTime = EnvironmentEdgeManager.currentTimeMillis();
- injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
+ long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+ injectEdge.incrementValue(timeToAdvance * 1000 + 1000);
+ if (hasTableLevelMaxLookback) {
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ (populateTime + MAX_LOOKBACK_AGE * 1000));
+ }
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(populateTime));
@@ -216,6 +242,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
@Test(timeout=120000L)
public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ }
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
@@ -229,7 +259,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
TableName indexTable = TableName.valueOf(indexName);
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
- injectEdge.incrementValue(10); //make sure we delete at a different ts
+ injectEdge.incrementValue(5); //make sure we delete at a different ts
Statement stmt = conn.createStatement();
stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
Assert.assertEquals(1, stmt.getUpdateCount());
@@ -252,7 +282,8 @@ public class MaxLookbackExtendedIT extends BaseTest {
assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
//wait for the lookback time. After this compactions should purge the deleted row
- injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+ injectEdge.incrementValue(timeToAdvance * 1000);
long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
String notDeletedRowSql =
String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
@@ -266,6 +297,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
" values ('c', 'cd', 'cde', 'cdef')");
conn.commit();
injectEdge.incrementValue(1L);
+ if (hasTableLevelMaxLookback) {
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
+ }
majorCompact(dataTable);
majorCompact(indexTable);
// Deleted row versions should be removed.
@@ -283,6 +318,9 @@ public class MaxLookbackExtendedIT extends BaseTest {
@Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ return;
+ }
Configuration conf = getUtility().getConfiguration();
//disable automatic memstore flushes
long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
@@ -365,6 +403,9 @@ public class MaxLookbackExtendedIT extends BaseTest {
public void testRecentMaxVersionsNotCompactedAway() throws Exception {
int versions = 2;
optionBuilder.append(", VERSIONS=" + versions);
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ }
tableDDLOptions = optionBuilder.toString();
String firstValue = "abc";
String secondValue = "def";
@@ -420,18 +461,24 @@ public class MaxLookbackExtendedIT extends BaseTest {
// at the appropriate times
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
- // Make the first row versions outside the max lookback window
- injectEdge.setValue(afterInsertSCN + MAX_LOOKBACK_AGE * 1000);
+ long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+ // Make the first two row versions of row a outside the max lookback window. Only the first version should
+ // be collected while the other more recent one should be retained.
+ injectEdge.setValue(afterFirstUpdateSCN + timeToAdvance * 1000);
+ if (hasTableLevelMaxLookback) {
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ afterFirstUpdateSCN + MAX_LOOKBACK_AGE * 1000);
+ }
majorCompact(dataTable);
majorCompact(indexTable);
// At this moment, the data table has three row versions for row a within the max
// lookback window.
// These versions have the following cells {empty, ab, abc, abcd}, {empty, def} and
// {empty, ghi}.
- assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 8);
+ assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
// The index table will have three full row versions for "a" and each will have 3 cells,
// one for each of columns empty, val2, and val3.
- assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 9);
+ assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
//empty column + 1 version each of val1,2 and 3 = 4
assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
//1 version of empty column, 1 version of val2, 1 version of val3 = 3
@@ -441,6 +488,10 @@ public class MaxLookbackExtendedIT extends BaseTest {
@Test(timeout=60000)
public void testOverrideMaxLookbackForCompaction() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=" + TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ }
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableNameOne = generateUniqueName();
createTable(tableNameOne);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
index e5e60a665a..4889e65bab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -60,6 +60,7 @@ import static org.junit.Assert.assertFalse;
@Category(NeedsOwnMiniClusterTest.class)
public class MaxLookbackIT extends BaseTest {
private static final int MAX_LOOKBACK_AGE = 15;
+ private static final int TABLE_LEVEL_MAX_LOOKBACK_AGE = 10;
private static final int ROWS_POPULATED = 2;
public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
private String tableDDLOptions;
@@ -119,6 +120,48 @@ public class MaxLookbackIT extends BaseTest {
Assert.fail("We should have thrown an exception for the too-early SCN");
}
+ @Test
+ public void testTooLowSCNWithTableLevelMaxLookback() throws Exception {
+ String dataTableName1 = generateUniqueName();
+ StringBuilder optionBuilderForDataTable1 = new StringBuilder(optionBuilder);
+ optionBuilderForDataTable1.append("MAX_LOOKBACK_AGE="+((TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000));
+ tableDDLOptions = optionBuilderForDataTable1.toString();
+ createTable(dataTableName1);
+ StringBuilder optionBuilderForDataTable2 = new StringBuilder(optionBuilder);
+ optionBuilderForDataTable2.append("MAX_LOOKBACK_AGE="+(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000));
+ String dataTableName2 = generateUniqueName();
+ tableDDLOptions = optionBuilderForDataTable2.toString();
+ createTable(dataTableName2);
+ Assert.assertEquals(new Long((TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000),
+ queryTableLevelMaxLookbackAge(dataTableName1));
+ Assert.assertEquals(new Long(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000),
+ queryTableLevelMaxLookbackAge(dataTableName2));
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ //increase long enough to make sure we can find the syscat row for the table
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ populateTable(dataTableName1);
+ populateTable(dataTableName2);
+ long populateTime = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000 + 1000);
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ (populateTime + MAX_LOOKBACK_AGE * 1000));
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ populateTime + (TABLE_LEVEL_MAX_LOOKBACK_AGE + 3) * 1000);
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(populateTime));
+ try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
+ connscn.createStatement().executeQuery("select * from " + dataTableName1 + " where " +
+ "val3 in (select val3 from " + dataTableName2 + ")");
+ } catch (SQLException se) {
+ SQLExceptionCode code =
+ SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
+ TestUtil.assertSqlExceptionCode(code, se);
+ return;
+ }
+ Assert.fail("We should have thrown an exception for the too-early SCN");
+ }
+
@Test(timeout=120000L)
public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
@@ -186,6 +229,75 @@ public class MaxLookbackIT extends BaseTest {
}
}
+ @Test
+ public void testRecentlyDeletedRowsNotCompactedAwayWithTableLevelMaxLookback() throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ optionBuilder.append("MAX_LOOKBACK_AGE="+(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000));
+ tableDDLOptions = optionBuilder.toString();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+
+ TableName dataTable = TableName.valueOf(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ TableName indexTable = TableName.valueOf(indexName);
+ injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
+ long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(5); //make sure we delete at a different ts
+ Statement stmt = conn.createStatement();
+ stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
+ Assert.assertEquals(1, stmt.getUpdateCount());
+ conn.commit();
+ //select stmt to get row we deleted
+ String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName);
+ String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
+ int rowsPlusDeleteMarker = ROWS_POPULATED;
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ flush(dataTable);
+ flush(indexTable);
+ assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
+ injectEdge.incrementValue(1); //new ts for major compaction
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+ assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+ //wait for the lookback time. After this compactions should purge the deleted row
+ injectEdge.incrementValue(TABLE_LEVEL_MAX_LOOKBACK_AGE * 1000);
+ long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
+ String notDeletedRowSql =
+ String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
+ String notDeletedIndexRowSql =
+ String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+ assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+ assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " values ('c', 'cd', 'cde', 'cdef')");
+ conn.commit();
+ injectEdge.incrementValue(1L);
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //should still be ROWS_POPULATED because we added one and deleted one
+ assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+ assertRawRowCount(conn, indexTable, ROWS_POPULATED);
+
+ //deleted row should be gone, but not deleted row should still be there.
+ assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
+ assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false);
+ assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
+ assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
+ }
+ }
+
@Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
ttl = 20;
@@ -258,6 +370,145 @@ public class MaxLookbackIT extends BaseTest {
}
}
+ @Test
+ public void testTTLAndTableLevelMaxLookbackWithMaxLookbackMoreThanTTL() throws Exception {
+ ttl = 8;
+ long maxLookbackAge = 12;
+ optionBuilder.append("TTL=").append(ttl).append(", MAX_LOOKBACK_AGE=").
+ append(maxLookbackAge * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ Configuration conf = getUtility().getConfiguration();
+ //disable automatic memstore flushes
+ long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+ HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ populateTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.incrementValue(1);
+ long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasTtl(conn, dataTable, ttl);
+ assertTableHasTtl(conn, indexTable, ttl);
+ //first make sure we inserted correctly
+ String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
+ String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
+ assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
+ assertExplainPlan(conn, indexSql, dataTableName, indexName);
+ assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
+ int originalRowCount = 2;
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //force a flush
+ flush(dataTable);
+ flush(indexTable);
+ //flush shouldn't have changed it
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ long timeToAdvance = (ttl * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ //make sure it's still on disk
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ injectEdge.incrementValue(1); //get a new timestamp for compaction
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //nothing should have been purged by this major compaction
+ assertRawRowCount(conn, dataTable, originalRowCount);
+ assertRawRowCount(conn, indexTable, originalRowCount);
+ //now wait till max lookback as max lookback is greater than TTL
+ timeToAdvance = (maxLookbackAge * 1000) -
+ (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
+ if (timeToAdvance > 0) {
+ injectEdge.incrementValue(timeToAdvance);
+ }
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() >
+ afterFirstInsertSCN + ttl * 1000);
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ afterFirstInsertSCN + MAX_LOOKBACK_AGE * 1000);
+ //make sure that we can compact away the now-expired rows
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ //note that before HBase 1.4, we don't have HBASE-17956
+ // and this will always return 0 whether it's still on-disk or not
+ assertRawRowCount(conn, dataTable, 0);
+ assertRawRowCount(conn, indexTable, 0);
+ }
+ finally {
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+ }
+ }
+
+ @Test
+ public void testExpiredRowsAreNotFlushedWithTableLevelMaxLookback() throws Exception {
+ ttl = 8;
+ long maxLookbackAge = 12;
+ optionBuilder.append("TTL=").append(ttl).append(", MAX_LOOKBACK_AGE=").
+ append(maxLookbackAge * 1000);
+ tableDDLOptions = optionBuilder.toString();
+ int delta = 1;
+ Configuration conf = getUtility().getConfiguration();
+ //disable automatic memstore flushes
+ long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
+ HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ createTable(dataTableName);
+ createIndex(dataTableName, indexName, 1);
+ TableName dataTable = TableName.valueOf(dataTableName);
+ TableName indexTable = TableName.valueOf(indexName);
+ assertTableHasTtl(conn, dataTable, ttl);
+ assertTableHasTtl(conn, indexTable, ttl);
+ injectEdge.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.incrementValue(delta);
+ populateTable(dataTableName);
+ long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
+ injectEdge.incrementValue(delta);
+ int expectedNumRows = 2;
+ assertRawRowCount(conn, dataTable, expectedNumRows);
+ assertRawRowCount(conn, indexTable, expectedNumRows);
+ injectEdge.incrementValue(ttl * 1000);
+ Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
+ afterFirstInsertSCN + maxLookbackAge * 1000);
+ flush(dataTable);
+ flush(indexTable);
+ assertRawRowCount(conn, dataTable, expectedNumRows);
+ assertRawRowCount(conn, indexTable, expectedNumRows);
+ injectEdge.incrementValue(delta);
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " values ('c', 'cd', 'cde', 'cdef')");
+ conn.commit();
+ conn.createStatement().execute("upsert into " + dataTableName +
+ " values ('d', 'de', 'def', 'defg')");
+ conn.commit();
+ expectedNumRows += 2;
+ injectEdge.incrementValue(delta);
+ assertRawRowCount(conn, dataTable, expectedNumRows);
+ assertRawRowCount(conn, indexTable, expectedNumRows);
+ injectEdge.incrementValue(maxLookbackAge * 1000);
+ flush(dataTable);
+ flush(indexTable);
+ // Recent two rows expired so, they won't be flushed
+ assertRawRowCount(conn, dataTable, expectedNumRows - 2);
+ assertRawRowCount(conn, indexTable, expectedNumRows - 2);
+ }
+ finally {
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
+ }
+ }
+
@Test(timeout=60000)
public void testRecentMaxVersionsNotCompactedAway() throws Exception {
int versions = 2;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index e65bee738c..852cf8f01f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -68,14 +68,16 @@ public class TableTTLIT extends BaseTest {
private final boolean multiCF;
private final boolean columnEncoded;
private final KeepDeletedCells keepDeletedCells;
+ private final Integer tableLevelMaxLooback;
public TableTTLIT(boolean multiCF, boolean columnEncoded,
- KeepDeletedCells keepDeletedCells, int versions, int ttl) {
+ KeepDeletedCells keepDeletedCells, int versions, int ttl, Integer tableLevelMaxLooback) {
this.multiCF = multiCF;
this.columnEncoded = columnEncoded;
this.keepDeletedCells = keepDeletedCells;
this.versions = versions;
this.ttl = ttl;
+ this.tableLevelMaxLooback = tableLevelMaxLooback;
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -103,6 +105,9 @@ public class TableTTLIT extends BaseTest {
} else {
optionBuilder.append(", COLUMN_ENCODED_BYTES=0");
}
+ if (tableLevelMaxLooback != null) {
+ optionBuilder.append(", MAX_LOOKBACK_AGE=").append(tableLevelMaxLooback * 1000);
+ }
this.tableDDLOptions = optionBuilder.toString();
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
@@ -117,15 +122,18 @@ public class TableTTLIT extends BaseTest {
}
@Parameterized.Parameters(name = "TableTTLIT_multiCF={0}, columnEncoded={1}, "
- + "keepDeletedCells={2}, versions={3}, ttl={4}")
+ + "keepDeletedCells={2}, versions={3}, ttl={4}, tableLevelMaxLookback={5}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { false, false, KeepDeletedCells.FALSE, 1, 100},
- { false, false, KeepDeletedCells.TRUE, 5, 50},
- { false, false, KeepDeletedCells.TTL, 1, 25},
- { true, false, KeepDeletedCells.FALSE, 5, 50},
- { true, false, KeepDeletedCells.TRUE, 1, 25},
- { true, false, KeepDeletedCells.TTL, 5, 100} });
+ { false, false, KeepDeletedCells.FALSE, 1, 100, null},
+ { false, false, KeepDeletedCells.TRUE, 5, 50, null},
+ { false, false, KeepDeletedCells.TTL, 1, 25, null},
+ { true, false, KeepDeletedCells.FALSE, 5, 50, null},
+ { true, false, KeepDeletedCells.TRUE, 1, 25, null},
+ { true, false, KeepDeletedCells.TTL, 5, 100, null},
+ { false, false, KeepDeletedCells.FALSE, 1, 100, 15},
+ { false, false, KeepDeletedCells.TRUE, 5, 50, 15},
+ { false, false, KeepDeletedCells.TTL, 1, 25, 15}});
}
/**
@@ -146,7 +154,8 @@ public class TableTTLIT extends BaseTest {
@Test
public void testMaskingAndCompaction() throws Exception {
- final int maxDeleteCounter = MAX_LOOKBACK_AGE;
+ final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
+ final int maxDeleteCounter = maxLookbackAge;
final int maxCompactionCounter = ttl / 2;
final int maxMaskingCounter = 2 * ttl;
final byte[] rowKey = Bytes.toBytes("a");
@@ -174,7 +183,7 @@ public class TableTTLIT extends BaseTest {
}
if (maskingCounter-- == 0) {
updateRow(conn, tableName, noCompactTableName, "a");
- injectEdge.incrementValue((ttl + MAX_LOOKBACK_AGE + 1) * 1000);
+ injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
LOG.debug("Masking " + i + " current time: " + injectEdge.currentTime());
ResultSet rs = conn.createStatement().executeQuery(
"SELECT count(*) FROM " + tableName);
@@ -205,7 +214,7 @@ public class TableTTLIT extends BaseTest {
}
afterCompaction = false;
compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX);
- long scn = injectEdge.currentTime() - MAX_LOOKBACK_AGE * 1000;
+ long scn = injectEdge.currentTime() - maxLookbackAge * 1000;
long scnEnd = injectEdge.currentTime();
long scnStart = Math.max(scn, startTime);
for (scn = scnEnd; scn >= scnStart; scn -= 1000) {
@@ -224,6 +233,9 @@ public class TableTTLIT extends BaseTest {
@Test
public void testRowSpansMultipleTTLWindows() throws Exception {
+ if (tableLevelMaxLooback != null) {
+ return;
+ }
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
index 60de875c40..eb39199380 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.thirdparty.com.google.common.collect.Lists
.newArrayListWithExpectedSize;
import static org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost
.PHOENIX_META_DATA_COPROCESSOR_CONF_KEY;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
import static org.apache.phoenix.util.TestUtil.analyzeTable;
import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertArrayEquals;
@@ -29,6 +30,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
@@ -1296,4 +1299,86 @@ public class ViewIT extends SplitSystemCatalogIT {
}
}
+ @Test
+ public void testCreateViewWithTableLevelMaxLookbackAge() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Long maxLookbackAge = 300L;
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL PRIMARY KEY, col1 integer) MAX_LOOKBACK_AGE="+maxLookbackAge;
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(createDdl);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullTableName));
+ String childViewName = generateUniqueName();
+ String fullChildViewName = SchemaUtil.getTableName(schemaName, childViewName);
+ createDdl = "CREATE VIEW " + fullChildViewName + " AS SELECT * FROM " + fullTableName;
+ stmt.execute(createDdl);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullChildViewName));
+ String grandChildViewName = generateUniqueName();
+ String fullGrandChildViewName = SchemaUtil.getTableName(schemaName, grandChildViewName);
+ createDdl = "CREATE VIEW " + fullGrandChildViewName + " (col2 varchar) AS SELECT * FROM " + fullChildViewName;
+ stmt.execute(createDdl);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(fullGrandChildViewName));
+ String childViewIndexName = generateUniqueName();
+ createDdl = "CREATE INDEX " + childViewIndexName + " ON " + fullChildViewName + " (COL1)";
+ stmt.execute(createDdl);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(
+ SchemaUtil.getTableName(schemaName, childViewIndexName)));
+ String grandChildViewIndexName = generateUniqueName();
+ createDdl = "CREATE INDEX " + grandChildViewIndexName + " ON " + fullGrandChildViewName + " (COL2)";
+ stmt.execute(createDdl);
+ assertEquals(maxLookbackAge, queryTableLevelMaxLookbackAge(
+ SchemaUtil.getTableName(schemaName, grandChildViewIndexName)));
+ }
+ }
+
+ @Test
+ public void testCreateInvalidViewWithTableLevelMaxLookbackAge() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ Long maxLookbackAge = 300L;
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL PRIMARY KEY, col1 integer)";
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(createDdl);
+ String viewName = generateUniqueName();
+ String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ createDdl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullTableName;
+ String viewOptions = " MAX_LOOKBACK_AGE=" + maxLookbackAge;
+ String createDdlWithViewOptions = createDdl + viewOptions;
+ SQLException err = assertThrows(SQLException.class, () -> stmt.execute(createDdlWithViewOptions));
+ assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+ stmt.execute(createDdl);
+ String viewIndexName = generateUniqueName();
+ createDdl = "CREATE INDEX " + viewIndexName + " ON " + fullViewName + " (COL1)";
+ String createIndexDdlWithViewOptions = createDdl + viewOptions;
+ err = assertThrows(SQLException.class, () -> stmt.execute(createIndexDdlWithViewOptions));
+ assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+ }
+ }
+
+ @Test
+ public void testMappedViewForNullMaxLookbackAge() throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ byte[] cfA = Bytes.toBytes(SchemaUtil.normalizeIdentifier("a"));
+ String dataTableName = generateUniqueName();
+ byte[] hbaseDataTableName = SchemaUtil.getTableNameAsBytes("", dataTableName);
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(hbaseDataTableName));
+ builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cfA));
+ admin.createTable(builder.build());
+ String createDdl = "CREATE VIEW " + dataTableName + " (id varchar primary key, a.col1 varchar)";
+ stmt.execute(createDdl);
+ assertNull(queryTableLevelMaxLookbackAge(dataTableName));
+ String viewIndexName = generateUniqueName();
+ createDdl = "CREATE INDEX " + viewIndexName + " ON " + dataTableName + " (a.col1)";
+ stmt.execute(createDdl);
+ assertNull(queryTableLevelMaxLookbackAge(viewIndexName));
+ }
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index f6b7b3a99e..cb07031ac5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -27,6 +27,8 @@ import static org.apache.phoenix.exception.SQLExceptionCode
import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MUTATE_TABLE;
import static org.apache.phoenix.exception.SQLExceptionCode
.NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData
.SYSTEM_LINK_HBASE_TABLE_NAME;
@@ -49,6 +51,8 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -1371,6 +1375,36 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
assertPKs(rs, new String[] {"K1", "K2", "K3", "K4"});
}
+ @Test
+ public void testAlterViewAndViewIndexMaxLookbackAgeFails() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ String ddl = "CREATE TABLE " + fullDataTableName + " (ID VARCHAR NOT NULL PRIMARY KEY, COL1 INTEGER)";
+ stmt.execute(ddl);
+ assertNull(queryTableLevelMaxLookbackAge(fullDataTableName));
+ String viewName = generateUniqueName();
+ String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ ddl = "CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullDataTableName;
+ stmt.execute(ddl);
+ assertNull(queryTableLevelMaxLookbackAge(fullViewName));
+ String alterViewDdl = "ALTER VIEW " + fullViewName + " SET MAX_LOOKBACK_AGE = 300";
+ SQLException err = assertThrows(SQLException.class, () -> stmt.execute(alterViewDdl));
+ assertEquals(VIEW_WITH_PROPERTIES.getErrorCode(), err.getErrorCode());
+ String viewIndexName = generateUniqueName();
+ String fullViewIndexName = SchemaUtil.getTableName(schemaName, viewIndexName);
+ ddl = "CREATE INDEX " + viewIndexName + " ON " + fullViewName + " (COL1)";
+ stmt.execute(ddl);
+ assertNull(queryTableLevelMaxLookbackAge(fullViewIndexName));
+ String alterViewIndexDdl = "ALTER INDEX " + viewIndexName + " ON " + fullViewName +
+ " ACTIVE SET MAX_LOOKBACK_AGE = 300";
+ err = assertThrows(SQLException.class, () -> stmt.execute(alterViewIndexDdl));
+ assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+ }
+ }
+
private void assertPKs(ResultSet rs, String[] expectedPKs)
throws SQLException {
List<String> pkCols = newArrayListWithExpectedSize(expectedPKs.length);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index a75a61fc47..c306c4e886 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -21,12 +21,14 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_UPDATE_CACHE_FREQ_FOR_INDEX;
+import static org.apache.phoenix.exception.SQLExceptionCode.MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -948,6 +950,28 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
asssertIsWALDisabled(conn,indexName,false);
}
+ @Test
+ public void testAlterIndexMaxLookbackAgeFails() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullDataTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ try(Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ String ddl = "CREATE TABLE " + fullDataTableName + " (id varchar not null primary key, col1 integer)";
+ stmt.execute(ddl);
+ assertNull(queryTableLevelMaxLookbackAge(fullDataTableName));
+ String indexName = generateUniqueName();
+ String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ ddl = "CREATE INDEX " + indexName + " ON " + fullDataTableName + " (COL1)";
+ stmt.execute(ddl);
+ assertNull(queryTableLevelMaxLookbackAge(fullIndexName));
+ String alterIndexDdl = "ALTER INDEX " + indexName + " ON " + fullDataTableName +
+ " ACTIVE SET MAX_LOOKBACK_AGE = 300";
+ SQLException err = assertThrows(SQLException.class, () -> stmt.execute(alterIndexDdl));
+ assertEquals(MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), err.getErrorCode());
+ }
+ }
+
private static void asssertIsWALDisabled(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isWALDisabled());
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index 894bf907a0..ba36ccc796 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -19,8 +19,13 @@ package org.apache.phoenix.end2end.transform;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.phoenix.coprocessor.tasks.TransformMonitorTask;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtilHelper;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -43,14 +48,21 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -66,17 +78,12 @@ import java.util.UUID;
import static org.apache.phoenix.end2end.index.ImmutableIndexExtendedIT.getRowCountForEmptyColValue;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.*;
import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.getRowCount;
+import static org.apache.phoenix.schema.PTable.TransformStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -84,10 +91,13 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class TransformToolIT extends ParallelStatsDisabledIT {
private static final Logger LOGGER = LoggerFactory.getLogger(TransformToolIT.class);
- private final String tableDDLOptions;
+ private String tableDDLOptions;
+ private boolean mutable;
+ private StringBuilder optionBuilder = new StringBuilder();
@Parameterized.Parameters(
name = "mutable={0}")
@@ -101,8 +111,8 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
public TransformToolIT(boolean mutable) {
- StringBuilder optionBuilder = new StringBuilder();
optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=NONE ");
+ this.mutable = mutable;
if (!mutable) {
optionBuilder.append(", IMMUTABLE_ROWS=true ");
}
@@ -127,10 +137,32 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@AfterClass
- public static synchronized void cleanup() {
+ public static synchronized void tearDown() {
TransformMonitorTask.disableTransformMonitorTask(false);
}
+ @Before
+ public void createIndexToolTables() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ IndexTool.createIndexToolTables(conn);
+ }
+ resetIndexRegionObserverFailPoints();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ deleteAllRows(conn,
+ TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES));
+ deleteAllRows(conn,
+ TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME));
+ }
+ EnvironmentEdgeManager.reset();
+ resetIndexRegionObserverFailPoints();
+ }
+
private void createTableAndUpsertRows(Connection conn, String dataTableFullName, int numOfRows) throws SQLException {
createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
}
@@ -281,6 +313,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
* Test presplitting an index table
*/
@Test
+ @Ignore("Test is already broken")
public void testSplitTable() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -353,6 +386,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testDataAfterTransformingMultiColFamilyTable() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -460,6 +494,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformMutationReadRepair() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -570,6 +605,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformMutationFailureRepair() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -684,6 +720,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformVerify() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -793,6 +830,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformVerify_shouldFixUnverified() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -959,6 +997,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformForGlobalViews() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -1033,6 +1072,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
@Test
+ @Ignore("Test is already broken")
public void testTransformForTenantViews() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -1120,6 +1160,85 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testInvalidRowsWithTableLevelMaxLookback() throws Exception {
+ if (! mutable) {
+ return;
+ }
+ int maxLookbackAge = 12000;
+ optionBuilder.append(", MAX_LOOKBACK_AGE=").append(maxLookbackAge);
+ tableDDLOptions = optionBuilder.toString();
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ int delta = 5;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 2;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions);
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
+ PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName + " SET COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName,
+ null, null, conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertEquals(Long.valueOf(maxLookbackAge), queryTableLevelMaxLookbackAge(record.getNewPhysicalTableName()));
+ assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
+ PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName());
+
+ ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+ customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(customEdge);
+ customEdge.incrementValue(delta);
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt = conn.prepareStatement(upsertQuery);
+ IndexToolIT.upsertRow(stmt, ++numOfRows);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ customEdge.incrementValue(delta);
+ // 2 initial rows are missing in the new table as TransformTool hasn't run yet
+ assertEquals(numOfRows - 2, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+ List<String> args = getArgList(schemaName, dataTableName, null, null, null,
+ null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+ TransformTool transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ CounterGroup mrJobCounters = getMRJobCounters(transformTool);
+ assertEquals(1,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(numOfRows, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ customEdge.incrementValue(delta);
+ record = Transform.getTransformRecord(schemaName, dataTableName,
+ null, null, conn.unwrap(PhoenixConnection.class));
+ assertEquals(TransformStatus.PENDING_CUTOVER.name(), record.getTransformStatus());
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ stmt = conn.prepareStatement(upsertQuery);
+ IndexToolIT.upsertRow(stmt, ++numOfRows);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ customEdge.incrementValue(delta);
+ assertEquals(numOfRows, getRowCount(conn, record.getNewPhysicalTableName()));
+ assertEquals(numOfRows, getRowCount(conn, dataTableFullName));
+ customEdge.incrementValue(maxLookbackAge);
+ args = getArgList(schemaName, dataTableName, null, null, null,
+ null, false, false, false, false, false);
+ args.add("-v");
+ args.add(IndexTool.IndexVerifyType.BEFORE.getValue());
+ transformTool = runTransformTool(args.toArray(new String[0]), 0);
+ mrJobCounters = getMRJobCounters(transformTool);
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(1,
+ mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(1, mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
+ }
+ }
public static Connection getTenantConnection(String tenant) throws SQLException {
Properties props = new Properties();
@@ -1219,4 +1338,7 @@ public class TransformToolIT extends ParallelStatsDisabledIT {
return tt;
}
-}
\ No newline at end of file
+ public static CounterGroup getMRJobCounters(TransformTool transformTool) throws IOException {
+ return transformTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 219374b9bf..76175e4dc9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -127,6 +127,11 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -142,6 +147,7 @@ import org.apache.phoenix.end2end.ParallelStatsEnabledIT;
import org.apache.phoenix.end2end.ParallelStatsEnabledTest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
@@ -2148,4 +2154,42 @@ public abstract class BaseTest {
}
return false;
}
+
+ protected Long queryTableLevelMaxLookbackAge(String fullTableName) throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ return pconn.getTableNoCache(fullTableName).getMaxLookbackAge();
+ }
+ }
+
+ public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
+ IOException, InterruptedException {
+ Scan scan = new Scan();
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().
+ getAdmin();
+ org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
+ Table table = hbaseConn.getTable(tableName);
+ boolean deletedRows = false;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ for (Result r : scanner) {
+ Delete del = new Delete(r.getRow());
+ table.delete(del);
+ deletedRows = true;
+ }
+ } catch (Exception e) {
+ //if the table doesn't exist, we have no rows to delete. Easier to catch
+ //than to pre-check for existence
+ }
+ //don't flush/compact if we didn't write anything, because we'll hang forever
+ if (deletedRows) {
+ getUtility().getAdmin().flush(tableName);
+ TestUtil.majorCompact(getUtility(), tableName);
+ }
+ }
+
+ static public void resetIndexRegionObserverFailPoints() {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index cf076f08ef..79b0168f77 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -830,8 +830,8 @@ public class TestUtil {
// In HBase 2.5 getLastMajorCompactionTimestamp doesn't seem to get updated when the
// clock is stopped, so check for the state going to NONE instead
if (state.equals(CompactionState.NONE) && (previousState != null
- && previousState.equals(CompactionState.MAJOR_AND_MINOR)
- || previousState.equals(CompactionState.MAJOR))) {
+ && (previousState.equals(CompactionState.MAJOR_AND_MINOR)
+ || previousState.equals(CompactionState.MAJOR)))) {
break;
}
previousState = state;