You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2020/03/26 18:04:00 UTC
[phoenix] branch 4.x updated: PHOENIX-5734 - IndexScrutinyTool
should not report rows beyond maxLookBack age
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 5521fd9 PHOENIX-5734 - IndexScrutinyTool should not report rows beyond maxLookBack age
5521fd9 is described below
commit 5521fd9d130629ac17ba431a423e58e17432e303
Author: Geoffrey Jacoby <gj...@apache.org>
AuthorDate: Thu Mar 5 09:57:56 2020 -0800
PHOENIX-5734 - IndexScrutinyTool should not report rows beyond maxLookBack age
---
.../phoenix/end2end/IndexScrutinyToolBaseIT.java | 7 +-
.../end2end/IndexScrutinyToolForTenantIT.java | 9 +-
.../phoenix/end2end/IndexScrutinyToolIT.java | 89 +++++-
.../end2end/IndexScrutinyWithMaxLookbackIT.java | 304 +++++++++++++++++++++
.../org/apache/phoenix/end2end/MaxLookbackIT.java | 59 +---
.../NonParameterizedIndexScrutinyToolIT.java | 8 +-
.../hadoop/hbase/regionserver/ScanInfoUtil.java | 4 +-
.../org/apache/phoenix/compile/QueryCompiler.java | 1 +
.../mapreduce/index/IndexScrutinyMapper.java | 54 +++-
.../mapreduce/index/IndexScrutinyTableOutput.java | 48 +++-
.../phoenix/mapreduce/index/IndexScrutinyTool.java | 32 ++-
.../index/PhoenixScrutinyJobCounters.java | 8 +-
.../ManualEnvironmentEdge.java} | 43 ++-
.../java/org/apache/phoenix/util/SchemaUtil.java | 19 ++
.../index/IndexScrutinyMapperForTest.java | 1 +
.../java/org/apache/phoenix/util/TestUtil.java | 44 ++-
16 files changed, 602 insertions(+), 128 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 54a5408..980c0fa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -16,7 +16,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -54,8 +54,9 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
- IndexScrutinyTool scrutiny = new IndexScrutinyTool(IndexScrutinyMapperForTest.class);
+ protected List<Job> runScrutiny(Class<? extends IndexScrutinyMapper> mapperClass,
+ String[] cmdArgs) throws Exception {
+ IndexScrutinyTool scrutiny = new IndexScrutinyTool(mapperClass);
Configuration conf = new Configuration(getUtility().getConfiguration());
scrutiny.setConf(conf);
int status = scrutiny.run(cmdArgs);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
index 351af9c..f4e1ce7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
@@ -16,6 +16,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -115,7 +116,7 @@ public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId,
EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
// Sunny case, both index and view are equal. 1 row
assertEquals(1, completedJobs.size());
for (Job job : completedJobs) {
@@ -145,7 +146,7 @@ public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
argValues =
getArgValues("", globalViewName, indexNameGlobal, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
// Sunny case, both index and view are equal. 1 row
assertEquals(1, completedJobs.size());
for (Job job : completedJobs) {
@@ -200,7 +201,7 @@ public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
String[]
argValues =
getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
assertEquals(2, completedJobs.size());
for (Job job : completedJobs) {
@@ -246,7 +247,7 @@ public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
argValues =
getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null,
tenantId, EnvironmentEdgeManager.currentTimeMillis());
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
assertEquals(1, completedJobs.size());
for (Job job : completedJobs) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 76fce98..a56ebb3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
@@ -75,6 +76,28 @@ import com.google.common.collect.Lists;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+ public static final String MISSING_ROWS_QUERY_TEMPLATE =
+ "SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , " +
+ "\"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , " +
+ "\"BEYOND_MAX_LOOKBACK\" , \"ID\" , \"NAME\" , \"EMPLOY_DATE\" , \"ZIP\" , \":ID\" , " +
+ "\"0:NAME\" , \"0:EMPLOY_DATE\" , \"0:ZIP\" " +
+ "FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"NAME\" VARCHAR," +
+ "\"EMPLOY_DATE\" TIMESTAMP,\"ZIP\" INTEGER,\":ID\" INTEGER,\"0:NAME\" VARCHAR," +
+ "\"0:EMPLOY_DATE\" DECIMAL,\"0:ZIP\" INTEGER) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\"," +
+ "\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('[TableName]','[IndexName]'," +
+ "[ScrutinyTs],[HasTargetRow]))";
+ public static final String BEYOND_MAX_LOOKBACK_QUERY_TEMPLATE =
+ "SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , " +
+ "\"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , " +
+ "\"BEYOND_MAX_LOOKBACK\" , \"ID\" , \"NAME\" , \"EMPLOY_DATE\" , \"ZIP\" , \":ID\" , " +
+ "\"0:NAME\" , \"0:EMPLOY_DATE\" , \"0:ZIP\" " +
+ "FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"NAME\" VARCHAR," +
+ "\"EMPLOY_DATE\" TIMESTAMP,\"ZIP\" INTEGER,\":ID\" INTEGER,\"0:NAME\" VARCHAR," +
+ "\"0:EMPLOY_DATE\" DECIMAL,\"0:ZIP\" INTEGER) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\"," +
+ "\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\", \"BEYOND_MAX_LOOKBACK\") " +
+ "IN (('[TableName]','[IndexName]'," +
+ "[ScrutinyTs],false,true))";
+
private String dataTableDdl;
private String indexTableDdl;
@@ -402,7 +425,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
- runScrutiny(argValues);
+ runScrutiny(IndexScrutinyMapperForTest.class, argValues);
// check the output files
Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
@@ -450,7 +473,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
// check that the output table contains the invalid rows
long
@@ -460,6 +483,10 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
invalidRowsQuery =
IndexScrutinyTableOutput
.getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis);
+ String missingRowsQuery = getMissingRowsQuery(scrutinyTimeMillis);
+ String invalidColsQuery = getInvalidColsQuery(scrutinyTimeMillis);
+ String beyondMaxLookbackQuery = getBeyondMaxLookbackQuery(scrutinyTimeMillis);
+
ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
assertTrue(rs.next());
assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
@@ -473,12 +500,37 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+ assertFalse(rs.getBoolean("BEYOND_MAX_LOOKBACK"));
assertEquals(3, rs.getInt("ID"));
assertEquals(null, rs.getObject(":ID")); // null for missing target row
assertFalse(rs.next());
// check that the job results were written correctly to the metadata table
- assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
+ assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery, missingRowsQuery,
+ invalidColsQuery, beyondMaxLookbackQuery);
+ }
+
+ private String getMissingRowsQuery(long scrutinyTimeMillis) {
+ return MISSING_ROWS_QUERY_TEMPLATE.
+ replace("[TableName]", dataTableFullName).
+ replace("[IndexName]", indexTableFullName).
+ replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis)).
+ replace("[HasTargetRow]", "false");
+ }
+
+ private String getInvalidColsQuery(long scrutinyTimeMillis) {
+ return MISSING_ROWS_QUERY_TEMPLATE.
+ replace("[TableName]", dataTableFullName).
+ replace("[IndexName]", indexTableFullName).
+ replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis)).
+ replace("[HasTargetRow]", "true");
+ }
+
+ private String getBeyondMaxLookbackQuery(long scrutinyTimeMillis) {
+ return BEYOND_MAX_LOOKBACK_QUERY_TEMPLATE.
+ replace("[TableName]", dataTableFullName).
+ replace("[IndexName]", indexTableFullName).
+ replace("[ScrutinyTs]", Long.toString(scrutinyTimeMillis));
}
/**
@@ -490,7 +542,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
String[]
argValues =
getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
- List<Job> completedJobs = runScrutiny(argValues);
+ List<Job> completedJobs = runScrutiny(IndexScrutinyMapperForTest.class, argValues);
long
scrutinyTimeMillis =
PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
@@ -535,7 +587,11 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
conn.commit();
}
- private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException {
+ private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
+ String invalidRowsQuery,
+ String missingRowsQuery,
+ String invalidColValuesQuery,
+ String beyondMaxLookbackQuery) throws SQLException {
ResultSet rs;
ResultSet
metadataRs =
@@ -545,11 +601,13 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
assertTrue(metadataRs.next());
List<? extends Object>
expected =
- Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
+ Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+ SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L,
1L, 2L, 1L, 1L,
"[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
"[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
- invalidRowsQuery);
+ invalidRowsQuery, missingRowsQuery, invalidColValuesQuery,
+ beyondMaxLookbackQuery, 0L);
if (dataTableDdl.contains("SALT_BUCKETS")) {
expected =
Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
@@ -557,9 +615,9 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
2L, 1L, 2L,
"[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
"[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]",
- invalidRowsQuery);
+ invalidRowsQuery, missingRowsQuery,
+ invalidColValuesQuery, beyondMaxLookbackQuery, 0L);
}
-
assertRsValues(metadataRs, expected);
String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
rs = conn.createStatement().executeQuery(missingTargetQuery);
@@ -571,13 +629,19 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
assertTrue(rs.next());
assertEquals(2, rs.getInt("ID"));
assertFalse(rs.next());
+ String lookbackQuery = metadataRs.getString("INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK");
+ rs = conn.createStatement().executeQuery(lookbackQuery);
+ assertFalse(rs.next());
}
// assert the result set contains the expected values in the given order
private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
for (int i = 0; i < expected.size(); i++) {
- assertEquals(expected.get(i), rs.getObject(i + 1));
+ assertEquals("Failed comparing 1-based column " + (i + 1) +
+ " out of " + expected.size(),
+ expected.get(i), rs.getObject(i + 1));
}
+
}
private void generateUniqueTableNames() {
@@ -609,7 +673,8 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
}
private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
- return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
+ return runScrutiny(IndexScrutinyMapperForTest.class,
+ getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH,
false, null, null, null, scrutinyTS));
}
@@ -629,7 +694,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
cmdArgs =
getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
false, null, null, null, Long.MAX_VALUE);
- return runScrutiny(cmdArgs);
+ return runScrutiny(IndexScrutinyMapperForTest.class, cmdArgs);
}
private void upsertRow(PreparedStatement stmt, int id, String name, int zip) throws SQLException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
new file mode 100644
index 0000000..dea19ff
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyWithMaxLookbackIT.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapper;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class IndexScrutinyWithMaxLookbackIT extends IndexScrutinyToolBaseIT {
+
+ private static PreparedStatement upsertDataStmt;
+ private static String dataTableFullName;
+ private static String schema;
+ private static String dataTableName;
+ private static String indexTableName;
+ private static String indexTableFullName;
+ private static String viewName;
+ private static boolean isViewIndex;
+ private static ManualEnvironmentEdge testClock;
+ public static final String UPSERT_DATA = "UPSERT INTO %s VALUES (?, ?, ?)";
+ public static final String DELETE_SCRUTINY_METADATA = "DELETE FROM "
+ + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME;
+ public static final String DELETE_SCRUTINY_OUTPUT = "DELETE FROM " +
+ IndexScrutinyTableOutput.OUTPUT_TABLE_NAME;
+ public static final int MAX_LOOKBACK = 6;
+ private long scrutinyTs;
+
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+ props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+ Integer.toString(MAX_LOOKBACK));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void setupTest() throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl(),
+ PropertiesUtil.deepCopy(TEST_PROPERTIES))){
+ conn.createStatement().execute(DELETE_SCRUTINY_METADATA);
+ conn.createStatement().execute(DELETE_SCRUTINY_OUTPUT);
+ conn.commit();
+ } catch (TableNotFoundException tnfe){
+ //This will happen the first time
+ }
+ }
+
+ @Test
+ public void testScrutinyOnRowsBeyondMaxLookBack() throws Exception {
+ setupTables();
+ upsertDataAndScrutinize(dataTableName, dataTableFullName, testClock);
+ assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+
+ }
+
+ @Test
+ public void testScrutinyOnRowsBeyondMaxLookback_viewIndex() throws Exception {
+ schema = "S"+generateUniqueName();
+ dataTableName = "T"+generateUniqueName();
+ dataTableFullName = SchemaUtil.getTableName(schema,dataTableName);
+ indexTableName = "VI"+generateUniqueName();
+ isViewIndex = true;
+ viewName = "V"+generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schema,viewName);
+ String indexFullName = SchemaUtil.getTableName(schema, indexTableName);
+ String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+ + "ZIP INTEGER) COLUMN_ENCODED_BYTES = 0, VERSIONS = 1 ";
+ String viewDDL = "CREATE VIEW %s AS SELECT * FROM %s";
+ String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) VERSIONS = 1";
+ testClock = new ManualEnvironmentEdge();
+
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ conn.createStatement().execute(String.format(dataTableDDL, dataTableFullName));
+ conn.createStatement().execute(String.format(viewDDL, viewFullName, dataTableFullName));
+ conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+ viewFullName));
+ conn.commit();
+ }
+ upsertDataAndScrutinize(viewName, viewFullName, testClock);
+ assertBeyondMaxLookbackOutput(viewFullName, indexFullName);
+ }
+
+ private void assertBeyondMaxLookbackOutput(String dataTableName, String indexTableName)
+ throws Exception {
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ ResultSet metadataRS = IndexScrutinyTableOutput.queryAllLatestMetadata(conn, dataTableName,
+ indexTableName);
+ assertTrue("No results from scrutiny metadata query!", metadataRS.next());
+ assertEquals(1, metadataRS.getLong("BEYOND_MAX_LOOKBACK_COUNT"));
+ String beyondLookbackOutputSql =
+ metadataRS.getString("INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK");
+ assertNotNull(beyondLookbackOutputSql);
+ ResultSet outputRS = conn.createStatement().executeQuery(beyondLookbackOutputSql);
+ assertTrue("No results from scrutiny beyond max lookback output query!",
+ outputRS.next());
+ assertTrue("Beyond max lookback flag not set",
+ outputRS.getBoolean("BEYOND_MAX_LOOKBACK"));
+ assertFalse("Too many rows output from scrutiny beyond max lookback output query!",
+ outputRS.next());
+ }
+ }
+
+ @Test
+ public void testScrutinyOnDeletedRowsBeyondMaxLookBack() throws Exception {
+ setupTables();
+ upsertDataThenDeleteAndScrutinize(dataTableName, dataTableFullName, testClock);
+ assertBeyondMaxLookbackOutput(dataTableFullName, indexTableFullName);
+ }
+
+ private void setupTables() throws SQLException {
+ schema = "S" + generateUniqueName();
+ dataTableName = "T" + generateUniqueName();
+ indexTableName = "I" + generateUniqueName();
+ dataTableFullName = SchemaUtil.getTableName(schema, dataTableName);
+ indexTableFullName = SchemaUtil.getTableName(schema, indexTableName);
+ isViewIndex = false;
+ String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+ + "ZIP INTEGER) COLUMN_ENCODED_BYTES=0, VERSIONS=1";
+ String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
+ testClock = new ManualEnvironmentEdge();
+
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ conn.createStatement().execute(String.format(dataTableDDL, dataTableFullName));
+ conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+ dataTableFullName));
+ conn.commit();
+ }
+ }
+
+ private void upsertDataAndScrutinize(String tableName, String tableFullName,
+ ManualEnvironmentEdge testClock)
+ throws Exception {
+ try(Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ // insert two rows
+ populateTable(tableFullName, conn);
+ long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
+ testClock.setValue(afterInsertSCN);
+ EnvironmentEdgeManager.injectEdge(testClock);
+ //move forward to the time we want to scrutinize, which is less than max lookback age
+ //for the initial inserts
+ testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+ scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
+ updateIndexRows(conn);
+ //now go past max lookback age for the initial 2 inserts
+ testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+ List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ assertCounters(job.getCounters());
+ }
+ }
+
+ private void populateTable(String tableFullName, Connection conn) throws SQLException {
+ upsertDataStmt = getUpsertDataStmt(tableFullName, conn);
+
+ NonParameterizedIndexScrutinyToolIT.upsertRow(upsertDataStmt, 1, "name-1", 98051);
+ NonParameterizedIndexScrutinyToolIT.upsertRow(upsertDataStmt, 2, "name-2", 98052);
+ conn.commit();
+ }
+
+ private void updateIndexRows(Connection conn) throws SQLException {
+ String tableName = isViewIndex ?
+ SchemaUtil.getTableName(schema, viewName) : dataTableFullName;
+ PreparedStatement stmt = getUpsertDataStmt(tableName, conn);
+ NonParameterizedIndexScrutinyToolIT.upsertRow(stmt, 1, "name-1", 38139);
+ conn.commit();
+ }
+
+ private void upsertDataThenDeleteAndScrutinize(String tableName, String tableFullName,
+ ManualEnvironmentEdge testClock)
+ throws Exception {
+ try(Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ populateTable(tableFullName, conn);
+ long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis() + 1;
+ testClock.setValue(afterInsertSCN);
+ EnvironmentEdgeManager.injectEdge(testClock);
+ deleteIndexRows(conn);
+ //move forward to the time we want to scrutinize, which is less than max lookback age
+ //for the initial inserts
+ testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+ scrutinyTs = EnvironmentEdgeManager.currentTimeMillis();
+ //now go past max lookback age for the initial 2 inserts
+ testClock.incrementValue(MAX_LOOKBACK /2 * 1000);
+
+ List<Job> completedJobs = runScrutiny(schema, tableName, indexTableName, scrutinyTs);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ assertCounters(job.getCounters());
+ }
+ }
+
+ private void deleteIndexRows(Connection conn) throws SQLException {
+ String deleteSql = "DELETE FROM " + SchemaUtil.getTableName(schema, indexTableName) + " " +
+ "LIMIT 1";
+ conn.createStatement().execute(deleteSql);
+ conn.commit();
+ }
+
+ private static PreparedStatement getUpsertDataStmt(String tableFullName, Connection conn) throws SQLException {
+ return conn.prepareStatement(String.format(UPSERT_DATA, tableFullName));
+ }
+
+ private void assertCounters(Counters counters) {
+ assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, BEYOND_MAX_LOOKBACK_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long scrutinyTs)
+ throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null, scrutinyTs);
+ }
+
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize, IndexScrutinyTool.SourceTable sourceTable,
+ Long scrutinyTs) throws Exception {
+ final String[]
+ cmdArgs =
+ getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+ true, IndexScrutinyTool.OutputFormat.TABLE,
+ null, null, scrutinyTs);
+ return runScrutiny(MaxLookbackIndexScrutinyMapper.class, cmdArgs);
+ }
+
+ private static class MaxLookbackIndexScrutinyMapper extends IndexScrutinyMapper {
+ @Override
+ public void postSetup(){
+ try {
+ String tableToCompact;
+ if (isViewIndex){
+ String physicalDataTableName =
+ SchemaUtil.getPhysicalHBaseTableName(schema, dataTableName, false).getString();
+ tableToCompact = MetaDataUtil.getViewIndexPhysicalName(physicalDataTableName);
+ } else {
+ tableToCompact = SchemaUtil.getPhysicalHBaseTableName(schema, indexTableName, false).getString();
+ }
+ TableName indexTable = TableName.valueOf(tableToCompact);
+ testClock.incrementValue(1);
+ getUtility().getHBaseAdmin().flush(indexTable);
+ TestUtil.majorCompact(getUtility(), indexTable);
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
index 37a3c81..15396cd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -67,24 +68,6 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
ManualEnvironmentEdge injectEdge;
private int ttl;
- private class ManualEnvironmentEdge extends EnvironmentEdge {
- // Sometimes 0 ts might have a special value, so lets start with 1
- protected long value = 1L;
-
- public void setValue(long newValue) {
- value = newValue;
- }
-
- public void incrementValue(long addedValue) {
- value += addedValue;
- }
-
- @Override
- public long currentTime() {
- return this.value;
- }
- }
-
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
@@ -159,8 +142,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(1); //new ts for major compaction
- majorCompact(dataTable, beforeFirstCompactSCN);
- majorCompact(indexTable, beforeFirstCompactSCN);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
//wait for the lookback time. After this compactions should purge the deleted row
@@ -177,8 +160,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("upsert into " + dataTableName +
" values ('c', 'cd', 'cde', 'cdef')");
conn.commit();
- majorCompact(dataTable, beforeSecondCompactSCN);
- majorCompact(indexTable, beforeSecondCompactSCN);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
//should still be ROWS_POPULATED because we added one and deleted one
assertRawRowCount(conn, dataTable, ROWS_POPULATED);
assertRawRowCount(conn, indexTable, ROWS_POPULATED);
@@ -241,8 +224,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
injectEdge.incrementValue(1); //get a new timestamp for compaction
- majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
- majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(dataTable);
+ majorCompact(indexTable);
//nothing should have been purged by this major compaction
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
@@ -253,8 +236,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
injectEdge.incrementValue(timeToAdvance);
}
//make sure that we can compact away the now-expired rows
- majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
- majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(dataTable);
+ majorCompact(indexTable);
//note that before HBase 1.4, we don't have HBASE-17956
// and this will always return 0 whether it's still on-disk or not
assertRawRowCount(conn, dataTable, 0);
@@ -317,16 +300,16 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
//after flush, check to make sure we can see all three versions at the appropriate times
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
- majorCompact(dataTable, EnvironmentEdgeManager.currentTimeMillis());
- majorCompact(indexTable, EnvironmentEdgeManager.currentTimeMillis());
+ majorCompact(dataTable);
+ majorCompact(indexTable);
//after major compaction, check to make sure we can see all three versions
// at the appropriate times
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis();
- majorCompact(dataTable, afterLookbackAgeSCN);
- majorCompact(indexTable, afterLookbackAgeSCN);
+ majorCompact(dataTable);
+ majorCompact(indexTable);
//empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
//2 versions of empty column, 2 versions of val2,
@@ -344,20 +327,8 @@ public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
admin.flush(table);
}
- private void majorCompact(TableName table, long compactionRequestedSCN) throws Exception {
- Admin admin = getUtility().getHBaseAdmin();
- admin.majorCompact(table);
- long lastCompactionTimestamp;
- AdminProtos.GetRegionInfoResponse.CompactionState state = null;
- while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table)) < compactionRequestedSCN
- || (state = admin.getCompactionState(table)).
- equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
- if (LOG.isTraceEnabled()) {
- LOG.trace("Last compaction time:" + lastCompactionTimestamp);
- LOG.trace("CompactionState: " + state);
- }
- Thread.sleep(100);
- }
+ private void majorCompact(TableName table) throws Exception {
+ TestUtil.majorCompact(getUtility(), table);
}
private void assertMultiVersionLookbacks(String dataTableSelectSql,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
index 93a723d..2c99062 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
@@ -223,7 +223,7 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
}
}
- private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
+ public static void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
SQLException {
int index = 1;
// insert row
@@ -233,13 +233,13 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
stmt.executeUpdate();
}
- private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+ public static void upsertRow(PreparedStatement stmt, int id, String name, int val)
throws SQLException {
int index = 1;
// insert row
stmt.setInt(index++, id);
stmt.setString(index++, name);
- stmt.setInt(index++, zip);
+ stmt.setInt(index++, val);
stmt.executeUpdate();
}
@@ -254,6 +254,6 @@ public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT
cmdArgs =
getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
false, null, null, null, Long.MAX_VALUE);
- return runScrutiny(cmdArgs);
+ return runScrutiny(IndexScrutinyMapperForTest.class, cmdArgs);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index 6854a75..e70ffc7 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -56,7 +56,7 @@ public class ScanInfoUtil {
public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
ScanInfo scanInfo) {
long ttl = scanInfo.getTtl();
- long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
+ long maxLookbackTtl = getMaxLookbackInMillis(scanInfo.getConfiguration());
if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
if (ttl == Long.MAX_VALUE
&& columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
@@ -110,7 +110,7 @@ public class ScanInfoUtil {
oldScanInfo.getComparator());
}
- private static long getMaxLookback(Configuration conf){
+ public static long getMaxLookbackInMillis(Configuration conf){
//config param is in seconds, switch to millis
return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 2cf9b10..5f75e8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.util.ScanUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.phoenix.util.SchemaUtil;
import com.google.common.base.Optional;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 2359b4a..bf3c765 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -52,7 +53,6 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ColumnInfo;
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
-
/**
* Mapper that reads from the data table and checks the rows against the index table
*/
@@ -96,6 +95,12 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
private long outputMaxRows;
private MessageDigest md5;
private long ttl;
+ private long scnTimestamp;
+ private long maxLookbackAgeMillis;
+
+ protected long getScrutinyTs(){
+ return scnTimestamp;
+ }
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
@@ -107,6 +112,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
final Properties overrideProps = new Properties();
String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ scnTimestamp = new Long(scn);
connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
connection.setAutoCommit(false);
batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
@@ -161,12 +167,18 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
LOGGER.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
ttl = getTableTtl();
+ maxLookbackAgeMillis = ScanInfoUtil.getMaxLookbackInMillis(configuration);
} catch (SQLException | NoSuchAlgorithmException e) {
tryClosingResourceSilently(this.outputUpsertStmt);
tryClosingResourceSilently(this.connection);
tryClosingResourceSilently(this.outputConn);
throw new RuntimeException(e);
}
+ postSetup();
+ }
+
+ protected void postSetup() {
+
}
private static void tryClosingResourceSilently(AutoCloseable res) {
@@ -244,14 +256,10 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
// fetch results from the target table and output invalid rows
queryTargetTable(context, targetStatement, targetPkToSourceValues);
- //check if there are any invalid rows that have been expired, report them
- //with EXPIRED_ROW_COUNT
- checkIfInvalidRowsExpired(context, targetPkToSourceValues);
+ //check if any invalid rows are just temporary side effects of ttl or compaction,
+ //and if so remove them from the list and count them as separate metrics
+ categorizeInvalidRows(context, targetPkToSourceValues);
- // any source values we have left over are invalid (e.g. data table rows without
- // corresponding index row)
- context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
- .increment(targetPkToSourceValues.size());
if (outputInvalidRows) {
for (Pair<Long, List<Object>> sourceRowWithoutTargetRow : targetPkToSourceValues.values()) {
List<Object> valuesWithoutTarget = sourceRowWithoutTargetRow.getSecond();
@@ -274,8 +282,8 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
protected void preQueryTargetTable() { }
- protected void checkIfInvalidRowsExpired(Context context,
- Map<String, Pair<Long,
+ protected void categorizeInvalidRows(Context context,
+ Map<String, Pair<Long,
List<Object>>> targetPkToSourceValues) {
Set<Map.Entry<String, Pair<Long, List<Object>>>>
entrySet = targetPkToSourceValues.entrySet();
@@ -288,8 +296,15 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
Pair<Long, List<Object>> sourceValues = entry.getValue();
Long sourceTS = sourceValues.getFirst();
if (hasRowExpiredOnSource(sourceTS, ttl)) {
- context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1);
- itr.remove();
+ context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1L);
+ itr.remove(); //don't output to the scrutiny table
+ } else if (isRowOlderThanMaxLookback(sourceTS)) {
+ context.getCounter(PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT).increment(1L);
+ //still output to the scrutiny table just in case it's useful
+ } else {
+ // otherwise it's invalid (e.g. data table rows without corresponding index row)
+ context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
+ .increment(1L);
}
}
}
@@ -299,7 +314,16 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
return ttl != Integer.MAX_VALUE && sourceTS + ttl*1000 < currentTS;
}
- private long getTableTtl() throws SQLException, IOException {
+ protected boolean isRowOlderThanMaxLookback(Long sourceTS){
+ if (maxLookbackAgeMillis == ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000){
+ return false;
+ }
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long maxLookBackTimeMillis = now - maxLookbackAgeMillis;
+ return sourceTS <= maxLookBackTimeMillis;
+ }
+
+ private int getTableTtl() throws SQLException, IOException {
PTable pSourceTable = PhoenixRuntime.getTable(connection, qSourceTable);
if (pSourceTable.getType() == PTableType.INDEX
&& pSourceTable.getIndexType() == PTable.IndexType.LOCAL) {
@@ -417,6 +441,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
outputUpsertStmt.setLong(index++, sourceTS); // SOURCE_TS
outputUpsertStmt.setLong(index++, targetTS); // TARGET_TS
outputUpsertStmt.setBoolean(index++, targetValues != null); // HAS_TARGET_ROW
+ outputUpsertStmt.setBoolean(index++, isRowOlderThanMaxLookback(sourceTS));
index = setStatementObjects(sourceValues, index, sourceTblColumnMetadata);
if (targetValues != null) {
index = setStatementObjects(targetValues, index, targetTblColumnMetadata);
@@ -498,4 +523,5 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
md5.reset();
}
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
index 411214e..986b435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
@@ -68,6 +68,7 @@ public class IndexScrutinyTableOutput {
" SOURCE_TS BIGINT,\n" +
" TARGET_TS BIGINT,\n" +
" HAS_TARGET_ROW BOOLEAN,\n" +
+ " BEYOND_MAX_LOOKBACK BOOLEAN,\n" +
" CONSTRAINT PK PRIMARY KEY\n" +
" (\n" +
" " + SOURCE_TABLE_COL_NAME + ",\n" +
@@ -75,7 +76,10 @@ public class IndexScrutinyTableOutput {
" " + SCRUTINY_EXECUTE_TIME_COL_NAME + ",\n" + // time at which the scrutiny ran
" SOURCE_ROW_PK_HASH\n" + // this hash makes the PK unique
" )\n" + // dynamic columns consisting of the source and target columns will follow
- ")";
+ ") COLUMN_ENCODED_BYTES = 0 "; //column encoding not supported with dyn columns (PHOENIX-5107)
+ public static final String OUTPUT_TABLE_BEYOND_LOOKBACK_DDL = "" +
+ "ALTER TABLE " + OUTPUT_TABLE_NAME + "\n" +
+ " ADD IF NOT EXISTS BEYOND_MAX_LOOKBACK BOOLEAN";
/**
* This table holds metadata about a scrutiny job - result counters and queries to fetch invalid
@@ -102,6 +106,8 @@ public class IndexScrutinyTableOutput {
" INVALID_ROWS_QUERY_ALL VARCHAR,\n" + // stored sql query to fetch all the invalid rows from the output table
" INVALID_ROWS_QUERY_MISSING_TARGET VARCHAR,\n" + // stored sql query to fetch all the invalid rows which are missing a target row
" INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL VARCHAR,\n" + // stored sql query to fetch all the invalid rows which have bad covered column values
+ " INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK VARCHAR,\n" + // stored sql query to fetch all the potentially invalid rows which are before max lookback age
+ " BEYOND_MAX_LOOKBACK_COUNT BIGINT,\n" +
" CONSTRAINT PK PRIMARY KEY\n" +
" (\n" +
" " + SOURCE_TABLE_COL_NAME + ",\n" +
@@ -109,8 +115,12 @@ public class IndexScrutinyTableOutput {
" " + SCRUTINY_EXECUTE_TIME_COL_NAME + "\n" +
" )\n" +
")\n";
+ public static final String OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL = "" +
+ "ALTER TABLE " + OUTPUT_METADATA_TABLE_NAME + "\n" +
+ " ADD IF NOT EXISTS INVALID_ROWS_QUERY_BEYOND_MAX_LOOKBACK VARCHAR, \n" +
+ " BEYOND_MAX_LOOKBACK_COUNT BIGINT";
- public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+ public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
/**
* Gets the parameterized upsert sql to the output table Used by the scrutiny MR job to write
@@ -177,6 +187,21 @@ public class IndexScrutinyTableOutput {
return paramQuery.replaceFirst("\\?", "true");
}
+ public static String getSqlQueryBeyondMaxLookback(Connection conn,
+ SourceTargetColumnNames columnNames, long scrutinyTimeMillis) throws SQLException {
+ String whereQuery =
+ constructOutputTableQuery(conn, columnNames,
+ getPksCsv() + ", " + SchemaUtil.getEscapedFullColumnName("HAS_TARGET_ROW")
+ + ", " + SchemaUtil.getEscapedFullColumnName("BEYOND_MAX_LOOKBACK"));
+ String inClause =
+ " IN " + QueryUtil.constructParameterizedInClause(getPkCols().size() + 2, 1);
+ String paramQuery = whereQuery + inClause;
+ paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery);
+ paramQuery = paramQuery.replaceFirst("\\?", "false"); //has_target_row false
+ paramQuery = paramQuery.replaceFirst("\\?", "true"); //beyond_max_lookback true
+ return paramQuery;
+ }
+
/**
* Query the metadata table for the given columns
* @param conn connection to use
@@ -197,6 +222,22 @@ public class IndexScrutinyTableOutput {
return ps.executeQuery();
}
+ public static ResultSet queryAllLatestMetadata(Connection conn, String qSourceTableName,
+ String qTargetTableName) throws SQLException {
+ String sql = "SELECT MAX(" + SCRUTINY_EXECUTE_TIME_COL_NAME + ") " +
+ "FROM " + OUTPUT_METADATA_TABLE_NAME +
+ " WHERE " + SOURCE_TABLE_COL_NAME + " = ?" + " AND " + TARGET_TABLE_COL_NAME + "= ?";
+ PreparedStatement stmt = conn.prepareStatement(sql);
+ stmt.setString(1, qSourceTableName);
+ stmt.setString(2, qTargetTableName);
+ ResultSet rs = stmt.executeQuery();
+ long scrutinyTimeMillis = 0L;
+ if (rs.next()){
+ scrutinyTimeMillis = rs.getLong(1);
+ } //even if we didn't find one, still need to do a query to return the right columns
+ return queryAllMetadata(conn, qSourceTableName, qTargetTableName, scrutinyTimeMillis);
+ }
+
/**
* Query the metadata table for all columns
* @param conn connection to use
@@ -258,6 +299,9 @@ public class IndexScrutinyTableOutput {
pStmt.setString(index++, getSqlQueryAllInvalidRows(conn, columnNames, scrutinyExecuteTime));
pStmt.setString(index++, getSqlQueryMissingTargetRows(conn, columnNames, scrutinyExecuteTime));
pStmt.setString(index++, getSqlQueryBadCoveredColVal(conn, columnNames, scrutinyExecuteTime));
+ pStmt.setString(index++, getSqlQueryBeyondMaxLookback(conn, columnNames, scrutinyExecuteTime));
+ pStmt.setLong(index++,
+ counters.findCounter(PhoenixScrutinyJobCounters.BEYOND_MAX_LOOKBACK_COUNT).getValue());
pStmt.addBatch();
}
pStmt.executeBatch();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index dda537f..b0d4d2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -116,9 +117,9 @@ public class IndexScrutinyTool extends Configured implements Tool {
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
@Inject
- Class<IndexScrutinyMapperForTest> mapperClass = null;
+ Class<? extends IndexScrutinyMapper> mapperClass = null;
- public IndexScrutinyTool(Class<IndexScrutinyMapperForTest> indexScrutinyMapperForTestClass) {
+ public IndexScrutinyTool(Class<? extends IndexScrutinyMapper> indexScrutinyMapperForTestClass) {
this.mapperClass = indexScrutinyMapperForTestClass;
}
@@ -215,11 +216,12 @@ public class IndexScrutinyTool extends Configured implements Tool {
private long scrutinyExecuteTime;
private long outputMaxRows; // per mapper
private String tenantId;
- Class<IndexScrutinyMapperForTest> mapperClass;
+ Class<? extends IndexScrutinyMapper> mapperClass;
public JobFactory(Connection connection, Configuration configuration, long batchSize,
boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
- String basePath, long outputMaxRows, String tenantId, Class<IndexScrutinyMapperForTest> mapperClass) {
+ String basePath, long outputMaxRows, String tenantId,
+ Class<? extends IndexScrutinyMapper> mapperClass) {
this.outputInvalidRows = outputInvalidRows;
this.outputFormat = outputFormat;
this.basePath = basePath;
@@ -242,7 +244,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
}
public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
- SourceTable sourceTable, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
+ SourceTable sourceTable, Class<? extends IndexScrutinyMapper> mapperClass) throws Exception {
Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
|| SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable));
@@ -336,7 +338,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
return configureSubmittableJob(job, outputPath, mapperClass);
}
- private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
+ private Job configureSubmittableJob(Job job, Path outputPath, Class<? extends IndexScrutinyMapper> mapperClass) throws Exception {
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.job.user.classpath.first", true);
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -414,6 +416,8 @@ public class IndexScrutinyTool extends Configured implements Tool {
? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
: EnvironmentEdgeManager.currentTimeMillis() - 60000;
+ validateTimestamp(configuration, ts);
+
if (indexTable != null) {
if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
throw new IllegalArgumentException(String
@@ -438,8 +442,12 @@ public class IndexScrutinyTool extends Configured implements Tool {
outputConfiguration.unset(PhoenixRuntime.TENANT_ID_ATTRIB);
try (Connection outputConn = ConnectionUtil.getOutputConnection(outputConfiguration)) {
outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+ outputConn.createStatement().
+ execute(IndexScrutinyTableOutput.OUTPUT_TABLE_BEYOND_LOOKBACK_DDL);
outputConn.createStatement()
.execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+ outputConn.createStatement().
+ execute(IndexScrutinyTableOutput.OUTPUT_METADATA_BEYOND_LOOKBACK_COUNTER_DDL);
}
}
@@ -505,6 +513,18 @@ public class IndexScrutinyTool extends Configured implements Tool {
}
}
+ private void validateTimestamp(Configuration configuration, long ts) {
+ long maxLookBackAge = ScanInfoUtil.getMaxLookbackInMillis(configuration);
+ if (maxLookBackAge != ScanInfoUtil.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000L) {
+ long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
+ if (ts < minTimestamp){
+ throw new IllegalArgumentException("Index scrutiny can't look back past the configured" +
+ "max lookback age: " + maxLookBackAge / 1000 + " seconds");
+ }
+ }
+
+ }
+
@VisibleForTesting
public List<Job> getJobs() {
return jobs;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
index 43965b7..54e8a94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
@@ -41,5 +41,11 @@ public enum PhoenixScrutinyJobCounters {
/**
* Number of batches processed
*/
- BATCHES_PROCESSED_COUNT;
+ BATCHES_PROCESSED_COUNT,
+ /**
+ * Number of rows in source that became older than the max lookback age while scrutiny
+ * was comparing them with the target, and didn't match. We break these out separately because
+ * they could be due to extra versions being compacted, and are harmless.
+ */
+ BEYOND_MAX_LOOKBACK_COUNT;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
similarity index 53%
copy from phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
copy to phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
index 43965b7..0ba7210 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ManualEnvironmentEdge.java
@@ -1,4 +1,3 @@
-/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,30 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.mapreduce.index;
+package org.apache.phoenix.util;
-/**
- * Counters used for Index Scrutiny MR job
- */
-public enum PhoenixScrutinyJobCounters {
- /**
- * number of rows in data table with a valid index row (or vice-versa)
- */
- VALID_ROW_COUNT,
- /**
- * number of rows in data table with an invalid index row (or vice-versa)
- */
- INVALID_ROW_COUNT,
- /**
- * Number of rows in the index table with an incorrect covered column value
- */
- BAD_COVERED_COL_VAL_COUNT,
- /**
- * Number of rows in source that have expired while scrutiny was comparing them with target
- */
- EXPIRED_ROW_COUNT,
- /**
- * Number of batches processed
- */
- BATCHES_PROCESSED_COUNT;
+public class ManualEnvironmentEdge extends EnvironmentEdge {
+ // Sometimes 0 ts might have a special value, so lets start with 1
+ protected long value = 1L;
+
+ public void setValue(long newValue) {
+ value = newValue;
+ }
+
+ public void incrementValue(long addedValue) {
+ value += addedValue;
+ }
+
+ @Override
+ public long currentTime() {
+ return this.value;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 9a23886..732e4cf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -49,6 +49,9 @@ import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
@@ -1183,6 +1186,22 @@ public class SchemaUtil {
return isNullable ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
}
+ public static int getTimeToLive(PhoenixConnection conn, String physicalName) throws SQLException {
+ byte[] tableQualifier = Bytes.toBytes(physicalName);
+ return getTimeToLive(conn, tableQualifier);
+ }
+
+ public static int getTimeToLive(PhoenixConnection conn, byte[] tableQualifier)
+ throws SQLException {
+ HTableDescriptor td = conn.getQueryServices().getTableDescriptor(tableQualifier);
+ HColumnDescriptor[] cds = td.getColumnFamilies();
+ if (cds.length > 0){
+ return cds[0].getTimeToLive();
+ } else {
+ return HConstants.FOREVER;
+ }
+ }
+
public static boolean hasGlobalIndex(PTable table) {
for (PTable index : table.getIndexes()) {
if (index.getIndexType() == IndexType.GLOBAL) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
similarity index 99%
rename from phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
rename to phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
index 99d50ee..bad2c1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
public class IndexScrutinyMapperForTest extends IndexScrutinyMapper {
public static final int TEST_TABLE_TTL = 3600;
+
public static class ScrutinyTestClock extends EnvironmentEdge {
long initialTime;
long delta;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 290ba8d..98104b4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -58,6 +58,7 @@ import java.util.Properties;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.AggregationManager;
import org.apache.phoenix.compile.ColumnResolver;
@@ -801,6 +803,21 @@ public class TestUtil {
conn.createStatement().execute(ddl);
}
+ public static void majorCompact(HBaseTestingUtility utility, TableName table)
+ throws IOException, InterruptedException {
+ long compactionRequestedSCN = EnvironmentEdgeManager.currentTimeMillis();
+ Admin admin = utility.getHBaseAdmin();
+ admin.majorCompact(table);
+ long lastCompactionTimestamp;
+ AdminProtos.GetRegionInfoResponse.CompactionState state = null;
+ while ((lastCompactionTimestamp = admin.getLastMajorCompactionTimestamp(table))
+ < compactionRequestedSCN
+ || (state = admin.getCompactionState(table)).
+ equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)){
+ Thread.sleep(100);
+ }
+ }
+
/**
* Runs a major compaction, and then waits until the compaction is complete before returning.
*
@@ -895,7 +912,7 @@ public class TestUtil {
current = cellScanner.current();
System.out.println(current + "column= " +
Bytes.toString(CellUtil.cloneQualifier(current)) +
- " val=" + Bytes.toString(CellUtil.cloneValue(current)));
+ " val=" + Bytes.toStringBinary(CellUtil.cloneValue(current)));
}
}
}
@@ -993,18 +1010,25 @@ public class TestUtil {
public static void printResultSet(ResultSet rs) throws SQLException {
while(rs.next()){
- StringBuilder builder = new StringBuilder();
- int columnCount = rs.getMetaData().getColumnCount();
- for(int i = 0; i < columnCount; i++) {
- Object value = rs.getObject(i+1);
- String output = value == null ? "null" : value.toString();
- builder.append(output);
- if(i + 1 < columnCount){
- builder.append(",");
+ printResult(rs, false);
+ }
+ }
+
+ public static void printResult(ResultSet rs, boolean multiLine) throws SQLException {
+ StringBuilder builder = new StringBuilder();
+ int columnCount = rs.getMetaData().getColumnCount();
+ for(int i = 0; i < columnCount; i++) {
+ Object value = rs.getObject(i+1);
+ String output = value == null ? "null" : value.toString();
+ builder.append(output);
+ if(i + 1 < columnCount){
+ builder.append(",");
+ if (multiLine){
+ builder.append("\n");
}
}
- System.out.println(builder.toString());
}
+ System.out.println(builder.toString());
}
public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState indexState) throws InterruptedException, SQLException {