You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/01/13 02:32:47 UTC
[phoenix] branch 4.15-HBase-1.5 updated: PHOENIX-5651:
IndexScrutiny does not handle TTL/row-expiry
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.15-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.15-HBase-1.5 by this push:
new 57a1e1e PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry
57a1e1e is described below
commit 57a1e1e9cbc01b12fd7422725227ee2ff985f914
Author: s.kadam <s....@salesforce.com>
AuthorDate: Fri Jan 10 10:27:09 2020 -0800
PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry
---
.../phoenix/end2end/IndexScrutinyToolBaseIT.java | 3 +-
.../end2end/IndexScrutinyToolForTenantIT.java | 2 +
.../phoenix/end2end/IndexScrutinyToolIT.java | 56 -------
.../NonParameterizedIndexScrutinyToolIT.java | 164 +++++++++++++++++++++
.../mapreduce/index/IndexScrutinyMapper.java | 99 +++++++++++--
.../index/IndexScrutinyMapperForTest.java | 30 ++++
.../phoenix/mapreduce/index/IndexScrutinyTool.java | 36 +++--
.../index/PhoenixScrutinyJobCounters.java | 4 +
8 files changed, 310 insertions(+), 84 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
index 9f368a8..54a5408 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java
@@ -16,6 +16,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
@@ -54,7 +55,7 @@ public class IndexScrutinyToolBaseIT extends BaseTest {
}
protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
- IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+ IndexScrutinyTool scrutiny = new IndexScrutinyTool(IndexScrutinyMapperForTest.class);
Configuration conf = new Configuration(getUtility().getConfiguration());
scrutiny.setConf(conf);
int status = scrutiny.run(cmdArgs);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
index 206e793..351af9c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java
@@ -89,6 +89,8 @@ public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT {
String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName);
connTenant.createStatement().execute(idxStmtTenant);
+ connTenant.commit();
+ connGlobal.commit();
}
@After public void teardown() throws SQLException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index 13df56d..76fce98 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
@@ -69,7 +68,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
/**
* Tests for the {@link IndexScrutinyTool}
@@ -164,51 +162,6 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
assertEquals(numIndexRows, countRows(conn, indexTableFullName));
}
- @Test public void testScrutinyOnArrayTypes() throws Exception {
- String dataTableName = generateUniqueName();
- String indexTableName = generateUniqueName();
- String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, VB VARBINARY)";
- String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
- String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
- String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
-
- try (Connection conn =
- DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
- conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
- conn.createStatement().execute(String.format(indexTableDDL, indexTableName, dataTableName));
- // insert two rows
- PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData, dataTableName));
- upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
- upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
- conn.commit();
-
- List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
- Job job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- Counters counters = job.getCounters();
- assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
- // Now insert a different varbinary row
- upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
- conn.commit();
-
- PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex, indexTableName));
- upsertIndexStmt.setString(1, "name-3");
- upsertIndexStmt.setInt(2, 3);
- upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
- upsertIndexStmt.executeUpdate();
- conn.commit();
-
- completedJobs = runScrutiny(null, dataTableName, indexTableName);
- job = completedJobs.get(0);
- assertTrue(job.isSuccessful());
- counters = job.getCounters();
- assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
- assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
- }
- }
-
/**
* Tests running a scrutiny while updates and deletes are happening.
* Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
@@ -689,15 +642,6 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
stmt.executeUpdate();
}
- private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws SQLException {
- int index = 1;
- // insert row
- stmt.setInt(index++, id);
- stmt.setString(index++, name);
- stmt.setBytes(index++, val);
- stmt.executeUpdate();
- }
-
private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
new file mode 100644
index 0000000..f70e6a2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonParameterizedIndexScrutinyToolIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.phoenix.mapreduce.index.IndexScrutinyMapperForTest.TEST_TABLE_TTL;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class NonParameterizedIndexScrutinyToolIT extends IndexScrutinyToolBaseIT {
+
+ @Test
+ public void testScrutinyOnArrayTypes() throws Exception {
+ String dataTableName = generateUniqueName();
+ String indexTableName = generateUniqueName();
+ String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+ + "VB VARBINARY)";
+ String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (VB)";
+ String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
+ String upsertIndex = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:VB\") values (?,?,?)";
+
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
+ conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+ dataTableName));
+ // insert two rows
+ PreparedStatement upsertDataStmt = conn.prepareStatement(String.format(upsertData,
+ dataTableName));
+ upsertRow(upsertDataStmt, 1, "name-1", new byte[] {127, 0, 0, 1});
+ upsertRow(upsertDataStmt, 2, "name-2", new byte[] {127, 1, 0, 5});
+ conn.commit();
+
+ List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+ // Now insert a different varbinary row
+ upsertRow(upsertDataStmt, 3, "name-3", new byte[] {1, 1, 1, 1});
+ conn.commit();
+
+ PreparedStatement upsertIndexStmt = conn.prepareStatement(String.format(upsertIndex,
+ indexTableName));
+ upsertIndexStmt.setString(1, "name-3");
+ upsertIndexStmt.setInt(2, 3);
+ upsertIndexStmt.setBytes(3, new byte[] {0, 0, 0, 1});
+ upsertIndexStmt.executeUpdate();
+ conn.commit();
+
+ completedJobs = runScrutiny(null, dataTableName, indexTableName);
+ job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ counters = job.getCounters();
+ assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+
+ @Test
+ public void testScrutinyOnRowsNearExpiry() throws Exception {
+ String dataTableName = generateUniqueName();
+ String indexTableName = generateUniqueName();
+ String dataTableDDL = "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, "
+ + "ZIP INTEGER) TTL="+TEST_TABLE_TTL;
+ String indexTableDDL = "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP)";
+ String upsertData = "UPSERT INTO %s VALUES (?, ?, ?)";
+ int initialDelta = -5000; //insert row with 5 secs before current timestamp
+ IndexScrutinyMapperForTest.ScrutinyTestClock
+ testClock = new IndexScrutinyMapperForTest.ScrutinyTestClock(initialDelta);
+
+ try (Connection conn =
+ DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+ conn.createStatement().execute(String.format(dataTableDDL, dataTableName));
+ conn.createStatement().execute(String.format(indexTableDDL, indexTableName,
+ dataTableName));
+ // insert two rows
+ PreparedStatement
+ upsertDataStmt = conn.prepareStatement(String.format(upsertData,
+ dataTableName));
+
+ EnvironmentEdgeManager.injectEdge(testClock);
+ upsertRow(upsertDataStmt, 1, "name-1", 98051);
+ upsertRow(upsertDataStmt, 2, "name-2", 98052);
+ conn.commit();
+
+ List<Job> completedJobs = runScrutiny(null, dataTableName, indexTableName);
+ Job job = completedJobs.get(0);
+ assertTrue(job.isSuccessful());
+ Counters counters = job.getCounters();
+ assertEquals(2, getCounterValue(counters, EXPIRED_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT));
+ assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+ }
+ }
+
+ private void upsertRow(PreparedStatement stmt, int id, String name, byte[] val) throws
+ SQLException {
+ int index = 1;
+ // insert row
+ stmt.setInt(index++, id);
+ stmt.setString(index++, name);
+ stmt.setBytes(index++, val);
+ stmt.executeUpdate();
+ }
+
+ private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+ throws SQLException {
+ int index = 1;
+ // insert row
+ stmt.setInt(index++, id);
+ stmt.setString(index++, name);
+ stmt.setInt(index++, zip);
+ stmt.executeUpdate();
+ }
+
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName)
+ throws Exception {
+ return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+ }
+
+ private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+ Long batchSize, IndexScrutinyTool.SourceTable sourceTable) throws Exception {
+ final String[]
+ cmdArgs =
+ getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable,
+ false, null, null, null, Long.MAX_VALUE);
+ return runScrutiny(cmdArgs);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index eceb658..b7a8cff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -27,16 +27,22 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.PhoenixJobCounters;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
@@ -45,7 +51,10 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -54,22 +63,23 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
+
/**
* Mapper that reads from the data table and checks the rows against the index table
*/
public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, Text, Text> {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexScrutinyMapper.class);
- private Connection connection;
+ protected Connection connection;
private List<ColumnInfo> targetTblColumnMetadata;
private long batchSize;
// holds a batch of rows from the table the mapper is iterating over
// Each row is a pair - the row TS, and the row values
- private List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>();
- private String targetTableQuery;
- private int numTargetPkCols;
- private boolean outputInvalidRows;
- private OutputFormat outputFormat = OutputFormat.FILE;
+ protected List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>();
+ protected String targetTableQuery;
+ protected int numTargetPkCols;
+ protected boolean outputInvalidRows;
+ protected OutputFormat outputFormat = OutputFormat.FILE;
private String qSourceTable;
private String qTargetTable;
private long executeTimestamp;
@@ -78,10 +88,11 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
private List<ColumnInfo> sourceTblColumnMetadata;
// used to write results to the output table
- private Connection outputConn;
- private PreparedStatement outputUpsertStmt;
+ protected Connection outputConn;
+ protected PreparedStatement outputUpsertStmt;
private long outputMaxRows;
private MessageDigest md5;
+ private long ttl;
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
@@ -100,14 +111,12 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
-
// get the index table and column names
String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
final String qIndexTable =
PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
-
// set the target table based on whether we're running the MR over the data or index
// table
SourceTable sourceTable =
@@ -148,6 +157,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
LOGGER.info("Target table base query: " + targetTableQuery);
md5 = MessageDigest.getInstance("MD5");
+ ttl = getTableTtl();
} catch (SQLException | NoSuchAlgorithmException e) {
tryClosingResourceSilently(this.outputUpsertStmt);
tryClosingResourceSilently(this.connection);
@@ -210,7 +220,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
}
}
- private void processBatch(Context context)
+ protected void processBatch(Context context)
throws SQLException, IOException, InterruptedException {
if (currentBatchValues.size() == 0) return;
context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
@@ -227,9 +237,14 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
buildTargetStatement(targetStatement);
+ preQueryTargetTable();
// fetch results from the target table and output invalid rows
queryTargetTable(context, targetStatement, targetPkToSourceValues);
+ //check if there are any invalid rows that have been expired, report them
+ //with EXPIRED_ROW_COUNT
+ checkIfInvalidRowsExpired(context, targetPkToSourceValues);
+
// any source values we have left over are invalid (e.g. data table rows without
// corresponding index row)
context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
@@ -254,7 +269,62 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
}
}
- private Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement)
+ protected void preQueryTargetTable() { }
+
+ protected void checkIfInvalidRowsExpired(Context context,
+ Map<String, Pair<Long,
+ List<Object>>> targetPkToSourceValues) {
+ Set<Map.Entry<String, Pair<Long, List<Object>>>>
+ entrySet = targetPkToSourceValues.entrySet();
+
+ Iterator<Map.Entry<String, Pair<Long, List<Object>>>> itr = entrySet.iterator();
+
+ // iterate and remove items simultaneously
+ while(itr.hasNext()) {
+ Map.Entry<String, Pair<Long, List<Object>>> entry = itr.next();
+ Pair<Long, List<Object>> sourceValues = entry.getValue();
+ Long sourceTS = sourceValues.getFirst();
+ if (hasRowExpiredOnSource(sourceTS, ttl)) {
+ context.getCounter(PhoenixScrutinyJobCounters.EXPIRED_ROW_COUNT).increment(1);
+ itr.remove();
+ }
+ }
+ }
+
+ protected boolean hasRowExpiredOnSource(Long sourceTS, Long ttl) {
+ long currentTS = EnvironmentEdgeManager.currentTimeMillis();
+ return ttl != Integer.MAX_VALUE && sourceTS + ttl*1000 < currentTS;
+ }
+
+ private long getTableTtl() throws SQLException, IOException {
+ PTable psourceTable = PhoenixRuntime.getTable(connection, qSourceTable);
+ if (psourceTable.getType() == PTableType.INDEX
+ && psourceTable.getIndexType() == PTable.IndexType.LOCAL) {
+ return Integer.MAX_VALUE;
+ }
+ String schema = psourceTable.getSchemaName().toString();
+ String table = getSourceTableName(psourceTable);
+ Admin admin = connection.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ String fullTableName = SchemaUtil.getQualifiedTableName(schema, table);
+ HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(fullTableName));
+ return tableDesc.getFamily(SchemaUtil.getEmptyColumnFamily(psourceTable)).getTimeToLive();
+ }
+
+ private String getSourceTableName(PTable psourceTable) {
+ String sourcePhysicalName = psourceTable.getPhysicalName().getString();
+ String table;
+ if (psourceTable.getType() == PTableType.VIEW) {
+ table = sourcePhysicalName;
+ } else if (MetaDataUtil.isViewIndex(sourcePhysicalName)) {
+ table = SchemaUtil.getParentTableNameFromIndexTable(sourcePhysicalName,
+ MetaDataUtil.VIEW_INDEX_TABLE_PREFIX);
+ } else {
+ table = psourceTable.getTableName().toString();
+ }
+ return table;
+ }
+
+ protected Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement)
throws SQLException {
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
new HashMap<>(currentBatchValues.size());
@@ -278,11 +348,10 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
return targetPkToSourceValues;
}
- private void queryTargetTable(Context context, PreparedStatement targetStatement,
+ protected void queryTargetTable(Context context, PreparedStatement targetStatement,
Map<String, Pair<Long, List<Object>>> targetPkToSourceValues)
throws SQLException, IOException, InterruptedException {
ResultSet targetResultSet = targetStatement.executeQuery();
-
while (targetResultSet.next()) {
indxWritable.readFields(targetResultSet);
List<Object> targetValues = indxWritable.getValues();
@@ -327,7 +396,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit
}
// pass in null targetValues if the target row wasn't found
- private void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
+ protected void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
throws SQLException {
if (context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue() > outputMaxRows) {
return;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
new file mode 100644
index 0000000..bdd5e45
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapperForTest.java
@@ -0,0 +1,30 @@
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+
+public class IndexScrutinyMapperForTest extends IndexScrutinyMapper {
+
+ public static final int TEST_TABLE_TTL = 3600;
+ public static class ScrutinyTestClock extends EnvironmentEdge {
+ long initialTime;
+ long delta;
+
+ public ScrutinyTestClock(long delta) {
+ initialTime = System.currentTimeMillis() + delta;
+ this.delta = delta;
+ }
+
+ @Override
+ public long currentTime() {
+ return System.currentTimeMillis() + delta;
+ }
+ }
+
+ @Override
+ public void preQueryTargetTable() {
+ // change the current time past ttl
+ ScrutinyTestClock clock = new ScrutinyTestClock(TEST_TABLE_TTL*1000);
+ EnvironmentEdgeManager.injectEdge(clock);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index c201f02..dda537f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
import java.util.List;
import com.google.common.base.Strings;
+import com.google.inject.Inject;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -114,10 +115,19 @@ public class IndexScrutinyTool extends Configured implements Tool {
"If specified, uses Tenant connection for tenant view index scrutiny (optional)");
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
+ @Inject
+ Class<IndexScrutinyMapperForTest> mapperClass = null;
+
+ public IndexScrutinyTool(Class<IndexScrutinyMapperForTest> indexScrutinyMapperForTestClass) {
+ this.mapperClass = indexScrutinyMapperForTestClass;
+ }
+
+ public IndexScrutinyTool() { }
+
/**
* Which table to use as the source table
*/
- public static enum SourceTable {
+ public enum SourceTable {
DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE,
/**
* Runs two separate jobs to iterate over both tables
@@ -125,7 +135,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
BOTH;
}
- public static enum OutputFormat {
+ public enum OutputFormat {
FILE, TABLE
}
@@ -205,10 +215,11 @@ public class IndexScrutinyTool extends Configured implements Tool {
private long scrutinyExecuteTime;
private long outputMaxRows; // per mapper
private String tenantId;
+ Class<IndexScrutinyMapperForTest> mapperClass;
public JobFactory(Connection connection, Configuration configuration, long batchSize,
boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
- String basePath, long outputMaxRows, String tenantId) {
+ String basePath, long outputMaxRows, String tenantId, Class<IndexScrutinyMapperForTest> mapperClass) {
this.outputInvalidRows = outputInvalidRows;
this.outputFormat = outputFormat;
this.basePath = basePath;
@@ -220,17 +231,18 @@ public class IndexScrutinyTool extends Configured implements Tool {
this.tenantId = tenantId;
this.ts = ts; // CURRENT_SCN to set
scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run.
- // Same for
+ // Same for
// all jobs created from this factory
PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
scrutinyExecuteTime);
if (!Strings.isNullOrEmpty(tenantId)) {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
}
+ this.mapperClass = mapperClass;
}
public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
- SourceTable sourceTable) throws Exception {
+ SourceTable sourceTable, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
|| SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable));
@@ -321,10 +333,10 @@ public class IndexScrutinyTool extends Configured implements Tool {
SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) ? pdataTable
: pindexTable);
- return configureSubmittableJob(job, outputPath);
+ return configureSubmittableJob(job, outputPath, mapperClass);
}
- private Job configureSubmittableJob(Job job, Path outputPath) throws Exception {
+ private Job configureSubmittableJob(Job job, Path outputPath, Class<IndexScrutinyMapperForTest> mapperClass) throws Exception {
Configuration conf = job.getConfiguration();
conf.setBoolean("mapreduce.job.user.classpath.first", true);
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -334,7 +346,7 @@ public class IndexScrutinyTool extends Configured implements Tool {
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
}
- job.setMapperClass(IndexScrutinyMapper.class);
+ job.setMapperClass((mapperClass == null ? IndexScrutinyMapper.class : mapperClass));
job.setNumReduceTasks(0);
// Set the Output classes
job.setMapOutputKeyClass(Text.class);
@@ -437,17 +449,17 @@ public class IndexScrutinyTool extends Configured implements Tool {
outputFormat, outputMaxRows));
JobFactory jobFactory =
new JobFactory(connection, configuration, batchSize, useSnapshot, ts,
- outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId);
+ outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId, mapperClass);
// If we are running the scrutiny with both tables as the source, run two separate jobs,
// one for each direction
if (SourceTable.BOTH.equals(sourceTable)) {
jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
- SourceTable.DATA_TABLE_SOURCE));
+ SourceTable.DATA_TABLE_SOURCE, mapperClass));
jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
- SourceTable.INDEX_TABLE_SOURCE));
+ SourceTable.INDEX_TABLE_SOURCE, mapperClass));
} else {
jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
- sourceTable));
+ sourceTable, mapperClass));
}
if (!isForeground) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
index 3cf73fd..43965b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
@@ -35,6 +35,10 @@ public enum PhoenixScrutinyJobCounters {
*/
BAD_COVERED_COL_VAL_COUNT,
/**
+ * Number of rows in source that have expired while scrutiny was comparing them with target
+ */
+ EXPIRED_ROW_COUNT,
+ /**
* Number of batches processed
*/
BATCHES_PROCESSED_COUNT;