You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/03/28 20:36:40 UTC
[phoenix] branch master updated: PHOENIX-5748 Simplify index update
generation code for consistent global indexes
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 9dd7d9d PHOENIX-5748 Simplify index update generation code for consistent global indexes
9dd7d9d is described below
commit 9dd7d9d8a720c1f98bbe1564f5cfee16b8c5d507
Author: Kadir <ko...@salesforce.com>
AuthorDate: Fri Mar 27 18:46:33 2020 -0700
PHOENIX-5748 Simplify index update generation code for consistent global indexes
---
.../end2end/ConcurrentMutationsExtendedIT.java | 112 +-
.../phoenix/end2end/ConcurrentMutationsIT.java | 3 +-
.../org/apache/phoenix/end2end/IndexToolIT.java | 40 +-
.../end2end/index/GlobalIndexCheckerIT.java | 95 ++
.../phoenix/compile/ServerBuildIndexCompiler.java | 15 +-
.../coprocessor/IndexRebuildRegionScanner.java | 1440 +++++++++++++-------
.../coprocessor/IndexToolVerificationResult.java | 304 +++++
.../UngroupedAggregateRegionObserver.java | 17 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 868 +++++++-----
.../org/apache/phoenix/index/IndexMaintainer.java | 2 +-
.../index/PhoenixIndexImportDirectReducer.java | 5 +-
.../index/PrepareIndexMutationsForRebuildTest.java | 732 ++++++++++
.../phoenix/index/VerifySingleIndexRowTest.java | 638 +++++++++
13 files changed, 3418 insertions(+), 853 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 571961d..d35451a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,13 +19,19 @@ package org.apache.phoenix.end2end;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
+
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.*;
import org.junit.Ignore;
import org.junit.Test;
@@ -51,6 +57,33 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
private final Object lock = new Object();
+ private long verifyIndexTable(String tableName, String indexName, Connection conn) throws Exception {
+ // This checks the state of every raw index row without rebuilding any row
+ IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.ONLY);
+ // This checks the state of an index row after it is repaired
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ // We want to check the index rows again as they may be modified by the read repair
+ IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.ONLY);
+ // Now we rebuild the entire index table and expect that it is still good after the full rebuild
+ IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.AFTER);
+ // Truncate, rebuild and verify the index table
+ PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName);
+ TableName physicalTableName = TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ try (Admin admin = pConn.getQueryServices().getAdmin()) {
+ admin.disableTable(physicalTableName);
+ admin.truncateTable(physicalTableName, true);
+ }
+ IndexToolIT.runIndexTool(true, false, "", tableName, indexName, null,
+ 0, IndexTool.IndexVerifyType.AFTER);
+ long actualRowCountAfterCompaction = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ assertEquals(actualRowCount, actualRowCountAfterCompaction);
+ return actualRowCount;
+ }
+
@Test
public void testSynchronousDeletesAndUpsertValues() throws Exception {
final String tableName = generateUniqueName();
@@ -130,7 +163,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
t2.start();
doneSignal.await(60, TimeUnit.SECONDS);
- IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ verifyIndexTable(tableName, indexName, conn);
}
@Test
@@ -191,7 +224,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
t2.start();
doneSignal.await(60, TimeUnit.SECONDS);
- IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ verifyIndexTable(tableName, indexName, conn);
}
@Test @Repeat(5)
@@ -204,9 +237,10 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
final String indexName = generateUniqueName();
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute("CREATE TABLE " + tableName
- + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+ "CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
final CountDownLatch doneSignal = new CountDownLatch(nThreads);
Runnable[] runnables = new Runnable[nThreads];
for (int i = 0; i < nThreads; i++) {
@@ -216,11 +250,12 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
try {
Connection conn = DriverManager.getConnection(getUrl());
for (int i = 0; i < 10000; i++) {
- boolean isNull = RAND.nextBoolean();
- int randInt = RAND.nextInt() % nIndexValues;
conn.createStatement().execute(
"UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
- + (isNull ? null : randInt) + ")");
+ + (RAND.nextBoolean() ? null : (RAND.nextInt() % nIndexValues)) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
if ((i % batchSize) == 0) {
conn.commit();
}
@@ -241,11 +276,72 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
}
assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ long actualRowCount = verifyIndexTable(tableName, indexName, conn);
assertEquals(nRows, actualRowCount);
}
@Test
+ public void testConcurrentUpsertsWithNoIndexedColumns() throws Exception {
+ int nThreads = 4;
+ final int batchSize = 100;
+ final int nRows = 997;
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER," +
+ "CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1) INCLUDE(v2, v3)");
+ final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+ Runnable[] runnables = new Runnable[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ runnables[i] = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ for (int i = 0; i < 1000; i++) {
+ if (RAND.nextInt() % 1000 < 10) {
+ // Do not include the indexed column in upserts
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " (k1, k2, b.v2, c.v3, d.v4) VALUES ("
+ + (RAND.nextInt() % nRows) + ", 0, "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
+ } else {
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ", "
+ + (RAND.nextBoolean() ? null : RAND.nextInt()) + ")");
+ }
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ }
+ for (int i = 0; i < nThreads; i++) {
+ Thread t = new Thread(runnables[i]);
+ t.start();
+ }
+
+ assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+ verifyIndexTable(tableName, indexName, conn);
+ }
+
+ @Test
public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
final String indexName = generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index d1cffe5..e882463 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -266,7 +266,8 @@ public class ConcurrentMutationsIT extends ParallelStatsDisabledIT {
EnvironmentEdgeManager.injectEdge(null);
}
}
-
+ @Ignore ("It is not possible to assign the same timestamp two separately committed mutations in the current model\n" +
+ " except when the server time goes backward. In that case, the behavior is not deterministic")
@Test
public void testDeleteRowAndUpsertValueAtSameTS1() throws Exception {
try {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c123741..7394f57 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -227,8 +229,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
- // Check after compaction
- TestUtil.doMajorCompaction(conn, dataTableFullName);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
setEveryNthRowWithNull(NROWS, 5, stmt);
@@ -239,7 +239,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
- TestUtil.doMajorCompaction(conn, dataTableFullName);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
@@ -457,6 +456,21 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
+ // The row key for the output table : timestamp | index table name | data row key
+ // The row key for the result table : timestamp | index table name | datable table region name |
+ // scan start row | scan stop row
+
+ // This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the
+ // fields may include the separator key
+ int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE);
+ offset++;
+ byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+ assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0,
+ indexTableFullNameBytes.length), 0);
+ assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]);
+ }
+
private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
throws Exception {
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
@@ -490,6 +504,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+ verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName);
+ hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+ scan = new Scan();
+ scanner = hIndexTable.getScanner(scan);
+ Result result = scanner.next();
+ assert(result != null);
+ verifyIndexTableRowKey(CellUtil.cloneRow(result.rawCells()[0]), indexTableFullName);
return errorMessageCell;
}
@@ -524,7 +546,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
null, 0, IndexTool.IndexVerifyType.AFTER);
assertEquals(1, MutationCountingRegionObserver.getMutationCount());
MutationCountingRegionObserver.setMutationCount(0);
- // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should
+ // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
// write any index rows
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BEFORE);
@@ -612,16 +634,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
- // Corrupt one cell by writing directly into the index table
- conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
- conn.commit();
- // Run the index tool using the only-verify option to detect this mismatch between the data and index table
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, -1, IndexTool.IndexVerifyType.ONLY);
- cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
- expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
- assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ null, 0, IndexTool.IndexVerifyType.ONLY);
dropIndexToolTables(conn);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 8e4a89c..8fbeae0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -113,6 +114,100 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testDelete() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ String dml = "DELETE from " + dataTableName + " WHERE id = 'a'";
+ assertEquals(1, conn.createStatement().executeUpdate(dml));
+ conn.commit();
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+ }
+ // Count the number of index rows
+ String query = "SELECT COUNT(*) from " + indexTableName;
+ // There should be one row in the index table
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ // Add rows and check everything is still okay
+ verifyTableHealth(conn, dataTableName, indexTableName);
+ }
+ }
+
+ @Test
+ public void testDeleteNonExistingRow() throws Exception {
+ if (async) {
+ // No need to run the same test twice one for async = true and the other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)");
+ String dml = "DELETE from " + dataTableName + " WHERE id = 'a'";
+ conn.createStatement().executeUpdate(dml);
+ conn.commit();
+ // Attempt to delete a row that does not exist
+ conn.createStatement().executeUpdate(dml);
+ conn.commit();
+ // Make sure this delete attempt did not make the index and data table inconsistent
+ IndexToolIT.runIndexTool(true, false, "", dataTableName, indexTableName, null,
+ 0, IndexTool.IndexVerifyType.ONLY);
+ }
+ }
+
+ @Test
+ public void testSimulateConcurrentUpdates() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+ if (async) {
+ // run the index MR job.
+ IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName);
+ }
+ // For the concurrent updates on the same row, the last write phase is ignored.
+ // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) where the
+ // verify flag is set to true and/or index rows are deleted and check that this does not impact the
+ // correctness.
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ // Do multiple updates on the same data row
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'aa')");
+ conn.commit();
+ // The expected state of the index table is {('aa', 'a', 'abcc', 'abcd'), ('bc', 'b', 'bcd', 'bcde')}
+ // Do more multiple updates on the same data row
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', null, null)");
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('a', 'ab')");
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1) values ('b', 'ab')");
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('b', 'ab', null)");
+ conn.commit();
+ // Now the expected state of the index table is {('ab', 'a', 'abcc' , null), ('ab', 'b', null, 'bcde')}
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * from " + indexTableName);
+ assertTrue(rs.next());
+ assertEquals("ab", rs.getString(1));
+ assertEquals("a", rs.getString(2));
+ assertEquals("abcc", rs.getString(3));
+ assertEquals(null, rs.getString(4));
+ assertTrue(rs.next());
+ assertEquals("ab", rs.getString(1));
+ assertEquals("b", rs.getString(2));
+ assertEquals(null, rs.getString(3));
+ assertEquals("bcde", rs.getString(4));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
public void testFailPostIndexDeleteUpdate() throws Exception {
String dataTableName = generateUniqueName();
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 4392e23..99caa6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -17,27 +17,30 @@
*/
package org.apache.phoenix.compile;
-import java.sql.SQLException;
-import java.util.Collections;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.schema.*;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
+import java.sql.SQLException;
+import java.util.Collections;
+
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
@@ -96,9 +99,9 @@ public class ServerBuildIndexCompiler {
throw new IllegalArgumentException(
"ServerBuildIndexCompiler does not support global indexes on transactional tables");
}
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
// By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
// However, in this case, we need to project all of the data columns that contribute to the index.
- IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
scan.addFamily(columnRef.getFamily());
@@ -121,6 +124,8 @@ public class ServerBuildIndexCompiler {
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+ BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
}
if (dataTable.isTransactional()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 83c479a..793ab8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -41,14 +41,16 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
+import java.util.NavigableSet;
import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
@@ -84,6 +87,8 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
@@ -92,6 +97,7 @@ import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
@@ -99,246 +105,18 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
- public static class VerificationResult {
- public static class PhaseResult {
- private long validIndexRowCount = 0;
- private long expiredIndexRowCount = 0;
- private long missingIndexRowCount = 0;
- private long invalidIndexRowCount = 0;
-
- public void add(PhaseResult phaseResult) {
- validIndexRowCount += phaseResult.validIndexRowCount;
- expiredIndexRowCount += phaseResult.expiredIndexRowCount;
- missingIndexRowCount += phaseResult.missingIndexRowCount;
- invalidIndexRowCount += phaseResult.invalidIndexRowCount;
- }
-
- public long getTotalCount() {
- return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
- }
-
- @Override
- public String toString() {
- return "PhaseResult{" +
- "validIndexRowCount=" + validIndexRowCount +
- ", expiredIndexRowCount=" + expiredIndexRowCount +
- ", missingIndexRowCount=" + missingIndexRowCount +
- ", invalidIndexRowCount=" + invalidIndexRowCount +
- '}';
- }
- }
-
- private long scannedDataRowCount = 0;
- private long rebuiltIndexRowCount = 0;
- private PhaseResult before = new PhaseResult();
- private PhaseResult after = new PhaseResult();
-
- @Override
- public String toString() {
- return "VerificationResult{" +
- "scannedDataRowCount=" + scannedDataRowCount +
- ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
- ", before=" + before +
- ", after=" + after +
- '}';
- }
-
- public long getScannedDataRowCount() {
- return scannedDataRowCount;
- }
-
- public long getRebuiltIndexRowCount() {
- return rebuiltIndexRowCount;
- }
-
- public long getBeforeRebuildValidIndexRowCount() {
- return before.validIndexRowCount;
- }
-
- public long getBeforeRebuildExpiredIndexRowCount() {
- return before.expiredIndexRowCount;
- }
-
- public long getBeforeRebuildInvalidIndexRowCount() {
- return before.invalidIndexRowCount;
- }
-
- public long getBeforeRebuildMissingIndexRowCount() {
- return before.missingIndexRowCount;
- }
-
- public long getAfterRebuildValidIndexRowCount() {
- return after.validIndexRowCount;
- }
-
- public long getAfterRebuildExpiredIndexRowCount() {
- return after.expiredIndexRowCount;
- }
-
- public long getAfterRebuildInvalidIndexRowCount() {
- return after.invalidIndexRowCount;
- }
-
- public long getAfterRebuildMissingIndexRowCount() {
- return after.missingIndexRowCount;
- }
-
- private void addScannedDataRowCount(long count) {
- this.scannedDataRowCount += count;
- }
-
- private void addRebuiltIndexRowCount(long count) {
- this.rebuiltIndexRowCount += count;
- }
-
- private void addBeforeRebuildValidIndexRowCount(long count) {
- before.validIndexRowCount += count;
- }
-
- private void addBeforeRebuildExpiredIndexRowCount(long count) {
- before.expiredIndexRowCount += count;
- }
-
- private void addBeforeRebuildMissingIndexRowCount(long count) {
- before.missingIndexRowCount += count;
- }
-
- private void addBeforeRebuildInvalidIndexRowCount(long count) {
- before.invalidIndexRowCount += count;
- }
-
- private void addAfterRebuildValidIndexRowCount(long count) {
- after.validIndexRowCount += count;
- }
-
- private void addAfterRebuildExpiredIndexRowCount(long count) {
- after.expiredIndexRowCount += count;
- }
-
- private void addAfterRebuildMissingIndexRowCount(long count) {
- after.missingIndexRowCount += count;
- }
-
- private void addAfterRebuildInvalidIndexRowCount(long count) {
- after.invalidIndexRowCount += count;
- }
-
- private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
- return true;
- }
- return false;
- }
-
- private long getValue(Cell cell) {
- return Long.parseLong(Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength()));
- }
-
- private void update(Cell cell) {
- if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
- addScannedDataRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
- addRebuiltIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildValidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildExpiredIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildMissingIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildInvalidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildValidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildExpiredIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildMissingIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildInvalidIndexRowCount(getValue(cell));
- }
- }
-
- public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
-
- public static VerificationResult getVerificationResult(Table hTable, long ts)
- throws IOException {
- VerificationResult verificationResult = new VerificationResult();
- byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
- byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
- Scan scan = new Scan();
- scan.setStartRow(startRowKey);
- scan.setStopRow(stopRowKey);
- ResultScanner scanner = hTable.getScanner(scan);
- for (Result result = scanner.next(); result != null; result = scanner.next()) {
- for (Cell cell : result.rawCells()) {
- verificationResult.update(cell);
- }
- }
- return verificationResult;
- }
-
- public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
- if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
- return false;
- }
- if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
- return true;
- }
- }
- if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
- if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
- return true;
- }
- if (before.validIndexRowCount + before.expiredIndexRowCount +
- after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
- return true;
- }
- }
- return false;
- }
-
- public void add(VerificationResult verificationResult) {
- scannedDataRowCount += verificationResult.scannedDataRowCount;
- rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
- before.add(verificationResult.before);
- after.add(verificationResult.after);
- }
- }
-
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+ public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+ public static final String
+ ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
+ public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
private long pageSizeInRows = Long.MAX_VALUE;
private int rowCountPerTask;
private boolean hasMore;
@@ -359,25 +137,32 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private Table resultHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
- private Map<byte[], Put> indexKeyToDataPutMap;
- private Map<byte[], Put> dataKeyToDataPutMap;
+ private Map<byte[], List<Mutation>> indexKeyToMutationMap;
+ private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
private TaskRunner pool;
private TaskBatch<Boolean> tasks;
private String exceptionMessage;
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
private RegionCoprocessorEnvironment env;
- private int indexTableTTL;
- private VerificationResult verificationResult;
+ private int indexTableTTL = 0;
+ private IndexToolVerificationResult verificationResult;
private boolean isBeforeRebuilt = true;
-
- IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
- final RegionCoprocessorEnvironment env,
- UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+ private boolean partialRebuild = false;
+ private int singleRowRebuildReturnCode;
+ private Map<byte[], NavigableSet<byte[]>> familyMap;
+ private byte[][] viewConstants;
+
+ @VisibleForTesting
+ public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env,
+ UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
super(innerScanner);
final Configuration config = env.getConfiguration();
if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ } else {
+ partialRebuild = true;
}
maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
@@ -390,23 +175,30 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
- if (!scan.isRaw()) {
- // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds
- List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
- indexMaintainer = maintainers.get(0);
- }
+ List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ indexMaintainer = maintainers.get(0);
this.scan = scan;
+ familyMap = scan.getFamilyMap();
+ if (familyMap.isEmpty()) {
+ familyMap = null;
+ }
+
this.innerScanner = innerScanner;
this.region = region;
this.env = env;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+ if (indexRowKey != null) {
+ setReturnCodeForSingleRowRebuild();
+ pageSizeInRows = 1;
+ }
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
- verificationResult = new VerificationResult();
+ verificationResult = new IndexToolVerificationResult();
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
+ viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
// Create the following objects only for rebuilds by IndexTool
indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
@@ -415,8 +207,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES));
resultHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
env).getTable(TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES));
- indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
new ThreadPoolBuilder("IndexVerify",
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -428,28 +220,72 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
}
+ private void setReturnCodeForSingleRowRebuild() throws IOException {
+ try (RegionScanner scanner = region.getScanner(scan)) {
+ List<Cell> row = new ArrayList<>();
+ scanner.next(row);
+ // Check if the data table row we have just scanned matches with the index row key.
+ // If not, there is no need to build the index row from this data table row,
+ // and just return zero row count.
+ if (row.isEmpty()) {
+ singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
+ } else {
+ Put put = new Put(CellUtil.cloneRow(row.get(0)));
+ for (Cell cell : row) {
+ put.add(cell);
+ }
+ if (checkIndexRow(indexRowKey, put)) {
+ singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+ } else {
+ singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+ }
+ }
+ }
+ }
+
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
- public boolean isFilterDone() { return false; }
+ public boolean isFilterDone() {
+ return false;
+ }
+
+ private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
+ byte[] startRow, byte[] stopRow) {
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+ int targetOffset = 0;
+ // The row key for the result table : timestamp | index table name | datable table region name |
+ // scan start row | scan stop row
+ byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+ ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
+ startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
+ Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+ targetOffset += keyPrefix.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+ targetOffset += indexTableName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
+ targetOffset += regionName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
+ targetOffset += startRow.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
+ return rowKey;
+ }
private void logToIndexToolResultTable() throws IOException {
long scanMaxTs = scan.getTimeRange().getMax();
- byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
- byte[] regionName = Bytes.toBytes(region.getRegionInfo().getRegionNameAsString());
- // The row key for the result table is the max timestamp of the scan + the table region name + scan start row
- // + scan stop row
- byte[] rowKey = new byte[keyPrefix.length + regionName.length + scan.getStartRow().length +
- scan.getStopRow().length];
- Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
- Bytes.putBytes(rowKey, keyPrefix.length, regionName, 0, regionName.length);
- Bytes.putBytes(rowKey, keyPrefix.length + regionName.length, scan.getStartRow(), 0,
- scan.getStartRow().length);
- Bytes.putBytes(rowKey, keyPrefix.length + regionName.length + scan.getStartRow().length,
- scan.getStopRow(), 0, scan.getStopRow().length);
+ byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(),
+ Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow());
Put put = new Put(rowKey);
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
@@ -504,38 +340,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
m.setDurability(Durability.SKIP_WAL);
}
- private Delete generateDeleteMarkers(Put put) {
- Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
- int cellCount = put.size();
- if (cellCount == allColumns.size() + 1) {
- // We have all the columns for the index table. So, no delete marker is needed
- return null;
- }
- Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
- long ts = 0;
- for (List<Cell> cells : put.getFamilyCellMap().values()) {
- if (cells == null) {
- break;
- }
- for (Cell cell : cells) {
- includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
- if (ts < cell.getTimestamp()) {
- ts = cell.getTimestamp();
- }
- }
- }
- Delete del = null;
- for (ColumnReference column : allColumns) {
- if (!includedColumns.contains(column)) {
- if (del == null) {
- del = new Delete(put.getRow());
- }
- del.addColumns(column.getFamily(), column.getQualifier(), ts);
- }
- }
- return del;
- }
-
private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
@@ -546,12 +350,32 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return uuidValue;
}
- private class SimpleValueGetter implements ValueGetter {
+ @VisibleForTesting
+ public int setIndexTableTTL(int ttl) {
+ indexTableTTL = ttl;
+ return 0;
+ }
+
+ @VisibleForTesting
+ public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+ this.indexMaintainer = indexMaintainer;
+ return 0;
+ }
+
+ @VisibleForTesting
+ public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) {
+ this.indexKeyToMutationMap = newTreeMap;
+ return 0;
+ }
+
+ public static class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
- SimpleValueGetter (final Put put) {
+
+ public SimpleValueGetter(final Put put) {
this.put = put;
}
+
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
@@ -570,7 +394,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
- private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+ public byte[] getIndexRowKey(final Put dataRow) throws IOException {
ValueGetter valueGetter = new SimpleValueGetter(dataRow);
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
null, null, HConstants.LATEST_TIMESTAMP);
@@ -586,15 +410,36 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return true;
}
- private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ @VisibleForTesting
+ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg) throws IOException {
logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, null, null);
}
- private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
+ private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+ byte[] rowKey;
+ int targetOffset = 0;
+ // The row key for the output table : timestamp | index table name | data row key
+ rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+ ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
+ Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+ targetOffset += keyPrefix.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+ targetOffset += indexTableName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
+ return rowKey;
+ }
+
+ @VisibleForTesting
+ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
final int PREFIX_LENGTH = 3;
@@ -602,32 +447,19 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
long scanMaxTs = scan.getTimeRange().getMax();
- byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
- byte[] rowKey;
- // The row key for the output table is the max timestamp of the scan + data row key
- if (dataRowKey != null) {
- rowKey = new byte[keyPrefix.length + dataRowKey.length];
- Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
- Bytes.putBytes(rowKey, keyPrefix.length, dataRowKey, 0, dataRowKey.length);
- } else {
- rowKey = new byte[keyPrefix.length];
- Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
- }
+ byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey);
Put put = new Put(rowKey);
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
scanMaxTs, region.getRegionInfo().getTable().getName());
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
scanMaxTs, indexMaintainer.getIndexTableName());
- if (dataRowKey != null) {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
- }
- if (indexRowKey != null) {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
- scanMaxTs, indexRowKey);
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
- }
+
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+ scanMaxTs, indexRowKey);
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
byte[] errorMessageBytes;
if (expectedValue != null) {
errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
@@ -642,8 +474,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
length += PREFIX_LENGTH;
Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
- }
- else {
+ } else {
errorMessageBytes = Bytes.toBytes(errorMsg);
}
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
@@ -655,21 +486,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
outputHTable.put(put);
}
- private long getMaxTimestamp(Result result) {
- long ts = 0;
- for (Cell cell : result.rawCells()) {
- if (ts < cell.getTimestamp()) {
- ts = cell.getTimestamp();
- }
- }
- return ts;
- }
-
- private long getMaxTimestamp(Put put) {
+ private static long getMaxTimestamp(Mutation m) {
long ts = 0;
- for (List<Cell> cells : put.getFamilyCellMap().values()) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
if (cells == null) {
- break;
+ continue;
}
for (Cell cell : cells) {
if (ts < cell.getTimestamp()) {
@@ -680,132 +501,477 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return ts;
}
- private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
- ValueGetter valueGetter = new SimpleValueGetter(dataRow);
- long ts = getMaxTimestamp(dataRow);
- Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
- if (indexPut == null) {
- // This means the data row does not have any covered column values
- indexPut = new Put(indexRow.getRow());
+ private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
+ List<Cell> cellList = m.getFamilyCellMap().get(family);
+ if (cellList == null) {
+ return null;
}
- else {
- // Remove the empty column prepared by Index codec as we need to change its value
- removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
- indexMaintainer.getEmptyKeyValueQualifier());
+ for (Cell cell : cellList) {
+ if (CellUtil.matchingQualifier(cell, qualifier)) {
+ return cell;
+ }
}
- // Add the empty column
- indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
- indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
- int cellCount = 0;
- long currentTime = EnvironmentEdgeManager.currentTime();
- for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+ return null;
+ }
+
+ private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
+ if (getTimestamp(expected) != getTimestamp(actual)) {
+ String errorMsg = "Not matching timestamp";
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+ errorMsg, null, null);
+ return false;
+ }
+ int expectedCellCount = 0;
+ for (List<Cell> cells : expected.getFamilyCellMap().values()) {
if (cells == null) {
- break;
+ continue;
}
for (Cell expectedCell : cells) {
+ expectedCellCount++;
byte[] family = CellUtil.cloneFamily(expectedCell);
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
- Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
- if (actualCell == null) {
- // Check if cell expired as per the current server's time and data table ttl
- // Index table should have the same ttl as the data table, hence we might not
- // get a value back from index if it has already expired between our rebuild and
- // verify
- // TODO: have a metric to update for these cases
- if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
- continue;
- }
- String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
- Bytes.toString(qualifier);
- logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ Cell actualCell = getCell(actual, family, qualifier);
+ if (actualCell == null ||
+ !CellUtil.matchingType(expectedCell, actualCell)) {
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+ String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
return false;
}
- if (actualCell.getTimestamp() < ts) {
- // Skip older cells since a Phoenix index row is composed of cells with the same timestamp
- continue;
- }
- // Check all columns
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
- String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
- Bytes.toString(qualifier);
- logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+ String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
return false;
- } else if (actualCell.getTimestamp() != ts) {
- String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
- Bytes.toString(qualifier) + " E: " + ts + " A: " +
- actualCell.getTimestamp();
- logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
- errorMsg, null, null);
- return false;
}
- cellCount++;
}
}
- if (cellCount != indexRow.rawCells().length) {
- String errorMsg = "Expected to find " + cellCount + " cells but got "
- + indexRow.rawCells().length + " cells";
- logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ int actualCellCount = 0;
+ for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+ if (cells == null) {
+ continue;
+ }
+ actualCellCount += cells.size();
+ }
+ if (expectedCellCount != actualCellCount) {
+ String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+ errorMsg);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isVerified(Put mutation) throws IOException {
+ List<Cell> cellList = mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier());
+ Cell cell = (cellList != null && !cellList.isEmpty()) ? cellList.get(0) : null;
+ if (cell == null) {
+ throw new DoNotRetryIOException("No empty column cell");
+ }
+ if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This is to reorder the mutations in descending order by the tuple of timestamp and mutation type where
+ * delete comes before put
+ */
+ public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new Comparator<Mutation>() {
+ @Override
+ public int compare(Mutation o1, Mutation o2) {
+ long ts1 = getTimestamp(o1);
+ long ts2 = getTimestamp(o2);
+ if (ts1 > ts2) {
+ return -1;
+ }
+ if (ts1 < ts2) {
+ return 1;
+ }
+ if (o1 instanceof Delete && o2 instanceof Put) {
+ return -1;
+ }
+ if (o1 instanceof Put && o2 instanceof Delete) {
+ return 1;
+ }
+ return 0;
+ }
+ };
+
+ private boolean isDeleteFamily(Mutation mutation) {
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isDeleteFamilyVersion(Mutation mutation) {
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : indexRow.rawCells()) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ }
+ del.addDeleteMarker(cell);
+ }
+ }
+ return getMutationsWithSameTS(put, del);
+ }
+ /**
+ * In this method, the actual list is repaired in memory using the expected list which is actually the output of
+ * rebuilding the index table row. The result of this repair is used only for verification.
+ */
+ private void repairActualMutationList(List<Mutation> actualMutationList, List<Mutation> expectedMutationList)
+ throws IOException {
+ // Find the first (latest) actual unverified put mutation
+ List<Mutation> repairedMutationList = new ArrayList<>(expectedMutationList.size());
+ for (Mutation actual : actualMutationList) {
+ if (actual instanceof Put && !isVerified((Put) actual)) {
+ long ts = getTimestamp(actual);
+ int expectedIndex;
+ int expectedListSize = expectedMutationList.size();
+ for (expectedIndex = 0; expectedIndex < expectedListSize; expectedIndex++) {
+ if (getTimestamp(expectedMutationList.get(expectedIndex)) <= ts) {
+ if (expectedIndex > 0) {
+ expectedIndex--;
+ }
+ break;
+ }
+ }
+ if (expectedIndex == expectedListSize) {
+ continue;
+ }
+ for (; expectedIndex < expectedListSize; expectedIndex++) {
+ Mutation mutation = expectedMutationList.get(expectedIndex);
+ if (mutation instanceof Put) {
+ mutation = new Put((Put) mutation);
+ } else {
+ mutation = new Delete((Delete) mutation);
+ }
+ repairedMutationList.add(mutation);
+ }
+ // Since we repair the entire history, there is no need to more than once
+ break;
+ }
+ }
+ if (repairedMutationList.isEmpty()) {
+ return;
+ }
+ actualMutationList.addAll(repairedMutationList);
+ Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+ }
+
+ private void cleanUpActualMutationList(List<Mutation> actualMutationList)
+ throws IOException {
+ Iterator<Mutation> iterator = actualMutationList.iterator();
+ Mutation previous = null;
+ while (iterator.hasNext()) {
+ Mutation mutation = iterator.next();
+ if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
+ (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+ iterator.remove();
+ } else {
+ if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
+ ((previous instanceof Put && mutation instanceof Put) ||
+ previous instanceof Delete && mutation instanceof Delete)) {
+ iterator.remove();
+ } else {
+ previous = mutation;
+ }
+ }
+ }
+ }
+
+ /**
+ * There are two types of verification: without repair and with repair. Without-repair verification is done before
+ * or after index rebuild. It is done before index rebuild to identify the rows to be rebuilt. It is done after
+ * index rebuild to verify the rows that have been rebuilt. With-repair verification can be done anytime using
+ * the “-v ONLY” option to check the consistency of the index table. Note that with-repair verification simulates
+ * read repair in-memory for the purpose of verification, but does not actually repair the data in the index.
+ *
+ * Unverified Rows
+ *
+ * For each mutable data table mutation during regular data table updates, two operations are done on the data table.
+ * One is to read the existing row state, and the second is to update the data table for this row. The processing of
+ * concurrent data mutations are serialized once for reading the existing row states, and then serialized again
+ * for updating the data table. In other words, they go through locking twice, i.e., [lock, read, unlock] and
+ * [lock, write, unlock]. Because of this two phase locking, for a pair of concurrent mutations (for the same row),
+ * the same row state can be read from the data table. This means the same existing index row can be made unverified
+ * twice with different timestamps, one for each concurrent mutation. These unverified mutations can be repaired
+ * from the data table later during HBase scans using the index read repair process. This is one of the reasons
+ * for having extra unverified rows in the index table. The other reason is the data table write failures.
+ * When a data table write fails, it leaves an unverified index row behind. These rows are never returned to clients,
+ * instead they are repaired, which means either they are rebuilt from their data table rows or they are deleted if
+ * their data table rows do not exist.
+ *
+ * Delete Family Version Markers
+ *
+ * The family version delete markers are generated by the read repair to remove extra unverified rows. They only
+ * show up in the actual mutation list since they are not generated for regular table updates or index rebuilds.
+ * For the verification purpose, these delete markers can be treated as extra unverified rows and can be safely
+ * skipped.
+ *
+ * Delete Family Markers
+ * Delete family markers are generated during read repair, regular table updates and index rebuilds to delete index
+ * table rows. The read repair generates them to delete extra unverified rows. During regular table updates or
+ * index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or
+ * data table row overwrites.
+ *
+ * Verification Algorithm
+ *
+ * IndexTool verification generates an expected list of index mutations from the data table rows and uses this list
+ * to check if index table rows are consistent with the data table.
+ *
+ * The expect list is generated using the index rebuild algorithm. This mean for a given row, the list can include
+ * a number of put and delete mutations such that the followings hold:
+ *
+ * Every mutation will include a set of cells with the same timestamp
+ * Every mutation has a different timestamp
+ * A delete mutation will include only delete family cells and it is for deleting the entire row and its versions
+ * Every put mutation is verified
+ *
+ * For both verification types, after the expected list of index mutations is constructed for a given data table,
+ * another list called the actual list of index mutations is constructed by reading the index table row using HBase
+ * raw scan and all versions of the cells of the row are retrieved.
+ *
+ * As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and
+ * delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that
+ * all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single
+ * list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list.
+ *
+ * For the without-repair verification, unverified mutations and family version delete markers are removed from
+ * the actual list and then the list is compared with the expected list.
+ *
+ * In case of the with-repair verification, the actual list is first repaired, then unverified mutations and family
+ * version delete markers are removed from the actual list and finally the list is compared with the expected list.
+ *
+ * The actual list is repaired as follows: Every unverified mutation is repaired using the method read repair uses.
+ * However, instead of going through actual repair implementation, the expected mutations are used for repair.
+ */
+
+ @VisibleForTesting
+ public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
+ throws IOException {
+ List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
+ if (expectedMutationList == null) {
+ throw new DoNotRetryIOException(NO_EXPECTED_MUTATION);
+ }
+ List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
+ if (actualMutationList == null || actualMutationList.isEmpty()) {
+ throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+ }
+ Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
+ Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ repairActualMutationList(actualMutationList, expectedMutationList);
+ }
+ cleanUpActualMutationList(actualMutationList);
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ int actualIndex = 0;
+ int expectedIndex = 0;
+ int matchingCount = 0;
+ int expectedSize = expectedMutationList.size();
+ int actualSize = actualMutationList.size();
+ Mutation expected = null;
+ Mutation previousExpected;
+ Mutation actual;
+ while (expectedIndex < expectedSize && actualIndex <actualSize) {
+ previousExpected = expected;
+ expected = expectedMutationList.get(expectedIndex);
+ // Check if cell expired as per the current server's time and data table ttl
+ // Index table should have the same ttl as the data table, hence we might not
+ // get a value back from index if it has already expired between our rebuild and
+ // verify
+ // TODO: have a metric to update for these cases
+ if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+ verificationPhaseResult.expiredIndexRowCount++;
+ return true;
+ }
+ actual = actualMutationList.get(actualIndex);
+ if (expected instanceof Put) {
+ if (previousExpected instanceof Delete) {
+ // Between an expected delete and put, there can be one or more deletes due to
+ // concurrent mutations or data table write failures. Skip all of them if any
+ while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
+ actualIndex++;
+ if (actualIndex == actualSize) {
+ break;
+ }
+ actual = actualMutationList.get(actualIndex);
+ }
+ if (actualIndex == actualSize) {
+ break;
+ }
+ }
+ if (isMatchingMutation(expected, actual, expectedIndex)) {
+ expectedIndex++;
+ actualIndex++;
+ matchingCount++;
+ continue;
+ }
+ } else { // expected instanceof Delete
+ // Between put and delete, delete and delete, or before first delete, there can be other deletes.
+ // Skip all of them if any
+ while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) {
+ actualIndex++;
+ if (actualIndex == actualSize) {
+ break;
+ }
+ actual = actualMutationList.get(actualIndex);
+ }
+ if (actualIndex == actualSize) {
+ break;
+ }
+ if (getTimestamp(actual) == getTimestamp(expected) &&
+ (actual instanceof Delete && isDeleteFamily(actual))) {
+ expectedIndex++;
+ actualIndex++;
+ matchingCount++;
+ continue;
+ }
+ String errorMsg = "Delete check failure";
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+ getTimestamp(expected),
+ getTimestamp(actual), errorMsg);
+ }
+ verificationPhaseResult.invalidIndexRowCount++;
return false;
}
+ if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+ for (; expectedIndex < expectedSize; expectedIndex++) {
+ expected = expectedMutationList.get(expectedIndex);
+ // Check if cell expired as per the current server's time and data table ttl
+ // Index table should have the same ttl as the data table, hence we might not
+ // get a value back from index if it has already expired between our rebuild and
+ // verify
+ // TODO: have a metric to update for these cases
+ if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+ verificationPhaseResult.expiredIndexRowCount++;
+ }
+ }
+ if (matchingCount > 0) {
+ if (verifyType != IndexTool.IndexVerifyType.ONLY) {
+ // We do not consider this as a verification issue but log it for further information.
+ // This may happen due to compaction
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+ String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
+ + actualMutationList.size();
+ logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+ getTimestamp(expectedMutationList.get(0)),
+ getTimestamp(actualMutationList.get(0)), errorMsg);
+ }
+ } else {
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+ String errorMsg = "Not matching index row";
+ logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+ getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
+ verificationPhaseResult.invalidIndexRowCount++;
+ return false;
+ }
+ }
+ verificationPhaseResult.validIndexRowCount++;
return true;
}
- private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
- VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
- int expectedRowCount = keys.size();
+ private static long getMaxTimestamp(Pair<Put, Delete> pair) {
+ Put put = pair.getFirst();
+ long ts1 = 0;
+ if (put != null) {
+ ts1 = getMaxTimestamp(put);
+ }
+ Delete del = pair.getSecond();
+ long ts2 = 0;
+ if (del != null) {
+ ts1 = getMaxTimestamp(del);
+ }
+ return (ts1 > ts2) ? ts1 : ts2;
+ }
+
+ private void verifyIndexRows(List<KeyRange> keys,
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ List<KeyRange> invalidKeys = new ArrayList<>();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
scanRanges.initializeScan(indexScan);
+ /*
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- indexScan.setFilter(skipScanFilter);
- int rowCount = 0;
+ indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ */
+ indexScan.setRaw(true);
+ indexScan.setMaxVersions();
try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
- Put dataPut = indexKeyToDataPutMap.get(result.getRow());
- if (dataPut == null) {
- // This should never happen
- String errorMsg = "Missing data row";
- logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
- exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
- throw new IOException(exceptionMessage);
+ KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
+ if (!keys.contains(keyRange)) {
+ continue;
}
- if (verifySingleIndexRow(result, dataPut)) {
- verificationPhaseResult.validIndexRowCount++;
- perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
- } else {
- verificationPhaseResult.invalidIndexRowCount++;
+ if (!verifySingleIndexRow(result, verificationPhaseResult)) {
+ invalidKeys.add(keyRange);
}
- rowCount++;
+ keys.remove(keyRange);
}
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
}
// Check if any expected rows from index(which we didn't get) are already expired due to TTL
// TODO: metrics for expired rows
- if (!perTaskDataKeyToDataPutMap.isEmpty()) {
- Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+ if (!keys.isEmpty()) {
+ Iterator<KeyRange> itr = keys.iterator();
long currentTime = EnvironmentEdgeManager.currentTime();
while(itr.hasNext()) {
- Entry<byte[], Put> entry = itr.next();
- long ts = getMaxTimestamp(entry.getValue());
- if (isTimestampBeforeTTL(currentTime, ts)) {
+ KeyRange keyRange = itr.next();
+ byte[] key = keyRange.getLowerRange();
+ List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+ if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
itr.remove();
- rowCount++;
verificationPhaseResult.expiredIndexRowCount++;
}
}
}
- if (rowCount != expectedRowCount) {
- for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+ if (keys.size() > 0) {
+ for (KeyRange keyRange : keys) {
String errorMsg = "Missing index row";
- logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
- 0, errorMsg);
+ byte[] key = keyRange.getLowerRange();
+ List<Mutation> mutationList = indexKeyToMutationMap.get(key);
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+ logToIndexToolOutputTable(dataKey,
+ keyRange.getLowerRange(),
+ getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
+ getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
}
- verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
+ verificationPhaseResult.missingIndexRowCount += keys.size();
}
+ keys.addAll(invalidKeys);
}
private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
@@ -815,8 +981,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
}
- private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
- final VerificationResult.PhaseResult verificationPhaseResult) {
+ private void addVerifyTask(final List<KeyRange> keys,
+ final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -825,7 +991,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
- verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
+ verifyIndexRows(keys, verificationPhaseResult);
} catch (Exception e) {
throw e;
}
@@ -834,33 +1000,27 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
});
}
- private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
- for (Mutation mutation : mutations) {
- indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
- }
- int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+ private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
tasks = new TaskBatch<>(taskCount);
- List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
- List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+ List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
+ List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
- Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- dataPutMapList.add(perTaskDataKeyToDataPutMap);
- VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ listOfKeyRangeList.add(keys);
+ IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
- for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
- perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+ for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
if (keys.size() == rowCountPerTask) {
- addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+ addVerifyTask(keys, perTaskVerificationPhaseResult);
keys = new ArrayList<>(rowCountPerTask);
- perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- dataPutMapList.add(perTaskDataKeyToDataPutMap);
- perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ listOfKeyRangeList.add(keys);
+ perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
}
}
if (keys.size() > 0) {
- addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+ addVerifyTask(keys, perTaskVerificationPhaseResult);
}
List<Boolean> taskResultList = null;
try {
@@ -877,13 +1037,19 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
throw new IOException(exceptionMessage);
}
}
+ for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
+ verificationPhaseResult.add(result);
+ }
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
- for (Map<byte[], Put> dataPutMap : dataPutMapList) {
- dataKeyToDataPutMap.putAll(dataPutMap);
+ Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (List<KeyRange> keyRangeList : listOfKeyRangeList) {
+ for (KeyRange keyRange : keyRangeList) {
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
+ newDataKeyToMutationMap.put(dataKey, dataKeyToMutationMap.get(dataKey));
+ }
}
- }
- for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
- verificationPhaseResult.add(result);
+ dataKeyToMutationMap.clear();
+ dataKeyToMutationMap = newDataKeyToMutationMap;
}
}
@@ -891,16 +1057,21 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] uuidValue = ServerCacheClient.generateId();
UngroupedAggregateRegionObserver.MutationList currentMutationList =
new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+ Put put = null;
for (Mutation mutation : mutationList) {
- Put put = (Put) mutation;
- currentMutationList.add(mutation);
- setMutationAttributes(put, uuidValue);
- uuidValue = commitIfReady(uuidValue, currentMutationList);
- Delete deleteMarkers = generateDeleteMarkers(put);
- if (deleteMarkers != null) {
- setMutationAttributes(deleteMarkers, uuidValue);
- currentMutationList.add(deleteMarkers);
+ if (mutation instanceof Put) {
+ if (put != null) {
+ // back to back put, i.e., no delete in between. we can commit the previous put
+ uuidValue = commitIfReady(uuidValue, currentMutationList);
+ }
+ currentMutationList.add(mutation);
+ setMutationAttributes(mutation, uuidValue);
+ put = (Put)mutation;
+ } else {
+ currentMutationList.add(mutation);
+ setMutationAttributes(mutation, uuidValue);
uuidValue = commitIfReady(uuidValue, currentMutationList);
+ put = null;
}
}
if (!currentMutationList.isEmpty()) {
@@ -910,12 +1081,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
private void verifyAndOrRebuildIndex() throws IOException {
- VerificationResult nextVerificationResult = new VerificationResult();
- nextVerificationResult.scannedDataRowCount = mutations.size();
+ IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
+ nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
rebuildIndexRows(mutations);
- nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+ nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -923,77 +1094,350 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
- VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.before.add(verificationPhaseResult);
- if (mutations.size() != verificationPhaseResult.getTotalCount()) {
- throw new DoNotRetryIOException(
- "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
- nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
- }
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt and now need to rebuild them
// At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
mutations.clear();
- for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) {
- mutations.add(entry.getValue());
+
+ for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+ if (entry.getValue().getFirst() != null) {
+ mutations.add(entry.getValue().getFirst());
+ }
+ if (entry.getValue().getSecond() != null) {
+ mutations.add(entry.getValue().getSecond());
+ }
}
rebuildIndexRows(mutations);
- nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+ nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
- indexKeyToDataPutMap.clear();
- VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+ indexKeyToMutationMap.clear();
+ for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
+ prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
+ }
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.after.add(verificationPhaseResult);
- if (mutations.size() != verificationPhaseResult.getTotalCount()) {
- throw new DoNotRetryIOException(
- "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
- nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
- }
}
- indexKeyToDataPutMap.clear();
verificationResult.add(nextVerificationResult);
}
+ private boolean isColumnIncluded(Cell cell) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ if (!familyMap.containsKey(family)) {
+ return false;
+ }
+ NavigableSet<byte[]> set = familyMap.get(family);
+ if (set == null || set.isEmpty()) {
+ return true;
+ }
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ return set.contains(qualifier);
+ }
+
+ public static long getTimestamp(Mutation m) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ return cell.getTimestamp();
+ }
+ }
+ throw new IllegalStateException("No cell found");
+ }
+
+ /**
+ * This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
+ * put comes before delete
+ */
+ public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() {
+ @Override
+ public int compare(Mutation o1, Mutation o2) {
+ long ts1 = getTimestamp(o1);
+ long ts2 = getTimestamp(o2);
+ if (ts1 < ts2) {
+ return -1;
+ }
+ if (ts1 > ts2) {
+ return 1;
+ }
+ if (o1 instanceof Put && o2 instanceof Delete) {
+ return -1;
+ }
+ if (o1 instanceof Delete && o2 instanceof Put) {
+ return 1;
+ }
+ return 0;
+ }
+ };
+
+ public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
+ List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
+ if (put != null) {
+ mutationList.add(put);
+ }
+ if (del != null) {
+ mutationList.add(del);
+ }
+ // Group the cells within a mutation based on their timestamps and create a separate mutation for each group
+ mutationList = (List<Mutation>) IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
+ // Reorder the mutations on the same row so that delete comes before put when they have the same timestamp
+ Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
+ return mutationList;
+ }
+
+ private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr,
+ ValueGetter mergedRowVG, long ts)
+ throws IOException {
+ Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ mergedRowVG, rowKeyPtr, ts, null, null);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with the empty column
+ byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
+ null, null, HConstants.LATEST_TIMESTAMP);
+ indexPut = new Put(indexRowKey);
+ } else {
+ removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier());
+ }
+ indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+ return indexPut;
+ }
+
+ public static void removeColumn(Put put, Cell deleteCell) {
+ byte[] family = CellUtil.cloneFamily(deleteCell);
+ List<Cell> cellList = put.getFamilyCellMap().get(family);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (CellUtil.matchingQualifier(cell, deleteCell)) {
+ cellIterator.remove();
+ if (cellList.isEmpty()) {
+ put.getFamilyCellMap().remove(family);
+ }
+ return;
+ }
+ }
+ }
+
+ public static void apply(Put destination, Put source) throws IOException {
+ for (List<Cell> cells : source.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (!destination.has(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))) {
+ destination.add(cell);
+ }
+ }
+ }
+ }
+
+ public static Put applyNew(Put destination, Put source) throws IOException {
+ Put next = new Put(destination);
+ apply(next, source);
+ return next;
+ }
+
+ private static void applyDeleteOnPut(Delete del, Put put) throws IOException {
+ for (List<Cell> cells : del.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
+ case DeleteFamily:
+ put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ break;
+ case DeleteColumn:
+ removeColumn(put, cell);
+ break;
+ default:
+ // We do not expect this can happen
+ throw new DoNotRetryIOException("Single version delete marker in data mutation " +
+ del);
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+ * state with the pending row mutation for index rebuild. This method is called only for global indexes.
+ * pendingMutations is a sorted list of data table mutations that are used to replay index table mutations.
+ * This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes
+ * after put.
+ */
+ public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
+ Put dataPut, Delete dataDel) throws IOException {
+ List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, dataDel);
+ List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
+ // The row key ptr of the data table row for which we will build index rows here
+ ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) :
+ new ImmutableBytesPtr(dataDel.getRow());
+ // Start with empty data table row
+ Put currentDataRowState = null;
+ // The index row key corresponding to the current data row
+ byte[] indexRowKeyForCurrentDataRow = null;
+ int dataMutationListSize = dataMutations.size();
+ for (int i = 0; i < dataMutationListSize; i++) {
+ Mutation mutation = dataMutations.get(i);
+ long ts = getTimestamp(mutation);
+ if (mutation instanceof Put) {
+ if (i < dataMutationListSize - 1) {
+ // If there is a pair of a put and delete mutation with the same timestamp then apply the delete
+ // mutation on the put. If the delete mutation deletes all the cells in the put mutation, the family
+ // cell map of the put mutation becomes empty and the mutation is ignored later
+ Mutation nextMutation = dataMutations.get(i + 1);
+ if (getTimestamp(nextMutation) == ts && nextMutation instanceof Delete) {
+ applyDeleteOnPut((Delete) nextMutation, (Put) mutation);
+ // Apply the delete mutation on the current data row state too
+ if (currentDataRowState != null) {
+ applyDeleteOnPut((Delete) nextMutation, currentDataRowState);
+ if (currentDataRowState.getFamilyCellMap().size() == 0) {
+ currentDataRowState = null;
+ indexRowKeyForCurrentDataRow = null;
+ }
+ }
+ // This increment is to skip the next (delete) mutation as we have already processed it
+ i++;
+ }
+ }
+ if (mutation.getFamilyCellMap().size() != 0) {
+ // Add this put on top of the current data row state to get the next data row state
+ Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
+ ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+ Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+ indexMutations.add(indexPut);
+ // Delete the current index row if the new index key is different than the current one
+ if (currentDataRowState != null) {
+ if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ indexMutations.add(del);
+ }
+ }
+ // For the next iteration of the for loop
+ currentDataRowState = nextDataRow;
+ indexRowKeyForCurrentDataRow = indexPut.getRow();
+ } else {
+ if (currentDataRowState != null) {
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ indexMutations.add(del);
+ // For the next iteration of the for loop
+ currentDataRowState = null;
+ indexRowKeyForCurrentDataRow = null;
+ }
+ }
+ } else { // mutation instanceof Delete
+ if (currentDataRowState != null) {
+ // We apply delete mutations only on the current data row state to obtain the next data row state.
+ // For the index table, we are only interested in if the index row should be deleted or not.
+ // There is no need to apply column deletes to index rows since index rows are always full rows
+ // and all the cells in an index row have the same timestamp value. Because of this index rows
+ // versions do not share cells.
+ applyDeleteOnPut((Delete) mutation, currentDataRowState);
+ Put nextDataRowState = currentDataRowState;
+ if (nextDataRowState.getFamilyCellMap().size() == 0) {
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ indexMutations.add(del);
+ currentDataRowState = null;
+ indexRowKeyForCurrentDataRow = null;
+ } else {
+ ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+ Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
+ indexMutations.add(indexPut);
+ // Delete the current index row if the new index key is different than the current one
+ if (indexRowKeyForCurrentDataRow != null) {
+ if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ indexMutations.add(del);
+ }
+ }
+ indexRowKeyForCurrentDataRow = indexPut.getRow();
+ }
+ }
+ }
+ }
+ return indexMutations;
+ }
+
+ @VisibleForTesting
+ public int prepareIndexMutations(Put put, Delete del) throws IOException {
+ List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
+ for (Mutation mutation : indexMutations) {
+ byte[] indexRowKey = mutation.getRow();
+ List<Mutation> mutationList = indexKeyToMutationMap.get(indexRowKey);
+ if (mutationList == null) {
+ mutationList = new ArrayList<>();
+ mutationList.add(mutation);
+ indexKeyToMutationMap.put(indexRowKey, mutationList);
+ } else {
+ mutationList.add(mutation);
+ }
+ }
+ return 0;
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
+ if (indexRowKey != null &&
+ singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
+ byte[] rowCountBytes =
+ PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode));
+ final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+ results.add(aggKeyValue);
+ return false;
+ }
Cell lastCell = null;
int rowCount = 0;
region.startRegionOperation();
try {
- // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them
- boolean partialRebuild = scan.isRaw();
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
List<Cell> row = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(row);
if (!row.isEmpty()) {
- lastCell = row.get(0);
+ lastCell = row.get(0); // lastCell is any cell from the last visited row
Put put = null;
Delete del = null;
for (Cell cell : row) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
+ continue;
+ }
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
- mutations.add(put);
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
- mutations.add(del);
}
del.addDeleteMarker(cell);
}
}
- if (partialRebuild) {
+ if (put == null && del == null) {
+ continue;
+ }
+ // Always add the put first and then delete for a given row. This simplifies the logic in
+ // IndexRegionObserver
+ if (put != null) {
+ mutations.add(put);
+ }
+ if (del != null) {
+ mutations.add(del);
+ }
+ if (!verify) {
if (put != null) {
setMutationAttributes(put, uuidValue);
}
@@ -1001,35 +1445,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
setMutationAttributes(del, uuidValue);
}
uuidValue = commitIfReady(uuidValue, mutations);
- }
- if (indexRowKey != null) {
- if (put != null) {
- setMutationAttributes(put, uuidValue);
- }
- Delete deleteMarkers = generateDeleteMarkers(put);
- if (deleteMarkers != null) {
- setMutationAttributes(deleteMarkers, uuidValue);
- mutations.add(deleteMarkers);
- uuidValue = commitIfReady(uuidValue, mutations);
- }
- // GlobalIndexChecker passed the index row key. This is to build a single index row.
- // Check if the data table row we have just scanned matches with the index row key.
- // If not, there is no need to build the index row from this data table row,
- // and just return zero row count.
- if (checkIndexRow(indexRowKey, put)) {
- rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
- } else {
- rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
- }
- break;
+ } else {
+ byte[] dataKey = (put != null) ? put.getRow() : del.getRow();
+ prepareIndexMutations(put, del);
+ dataKeyToMutationMap.put(dataKey, new Pair<Put, Delete>(put, del));
}
rowCount++;
}
} while (hasMore && rowCount < pageSizeInRows);
- if (!partialRebuild && indexRowKey == null) {
- verifyAndOrRebuildIndex();
- } else {
- if (!mutations.isEmpty()) {
+ if (!mutations.isEmpty()) {
+ if (verify) {
+ verifyAndOrRebuildIndex();
+ } else {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
@@ -1042,10 +1469,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
region.closeRegionOperation();
mutations.clear();
if (verify) {
- indexKeyToDataPutMap.clear();
- dataKeyToDataPutMap.clear();
+ dataKeyToMutationMap.clear();
+ indexKeyToMutationMap.clear();
}
}
+ if (indexRowKey != null) {
+ rowCount = singleRowRebuildReturnCode;
+ }
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue;
if (lastCell == null) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
new file mode 100644
index 0000000..ed92fad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+
+public class IndexToolVerificationResult {
+ public static class PhaseResult {
+ long validIndexRowCount = 0;
+ long expiredIndexRowCount = 0;
+ long missingIndexRowCount = 0;
+ long invalidIndexRowCount = 0;
+
+ public void add(PhaseResult phaseResult) {
+ validIndexRowCount += phaseResult.validIndexRowCount;
+ expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+ missingIndexRowCount += phaseResult.missingIndexRowCount;
+ invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+ }
+
+ public PhaseResult(){}
+
+ public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
+ long missingIndexRowCount, long invalidIndexRowCount) {
+ this.validIndexRowCount = validIndexRowCount;
+ this.expiredIndexRowCount = expiredIndexRowCount;
+ this.missingIndexRowCount = missingIndexRowCount;
+ this.invalidIndexRowCount = invalidIndexRowCount;
+ }
+
+ public long getTotalCount() {
+ return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+ }
+
+ @Override
+ public String toString() {
+ return "PhaseResult{" +
+ "validIndexRowCount=" + validIndexRowCount +
+ ", expiredIndexRowCount=" + expiredIndexRowCount +
+ ", missingIndexRowCount=" + missingIndexRowCount +
+ ", invalidIndexRowCount=" + invalidIndexRowCount +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (!(o instanceof PhaseResult)) {
+ return false;
+ }
+ PhaseResult pr = (PhaseResult) o;
+ return this.expiredIndexRowCount == pr.expiredIndexRowCount
+ && this.validIndexRowCount == pr.validIndexRowCount
+ && this.invalidIndexRowCount == pr.invalidIndexRowCount
+ && this.missingIndexRowCount == pr.missingIndexRowCount;
+ }
+
+ @Override
+ public int hashCode() {
+ long result = 17;
+ result = 31 * result + expiredIndexRowCount;
+ result = 31 * result + validIndexRowCount;
+ result = 31 * result + missingIndexRowCount;
+ result = 31 * result + invalidIndexRowCount;
+ return (int)result;
+ }
+ }
+
+ long scannedDataRowCount = 0;
+ long rebuiltIndexRowCount = 0;
+ PhaseResult before = new PhaseResult();
+ PhaseResult after = new PhaseResult();
+
+ @Override
+ public String toString() {
+ return "VerificationResult{" +
+ "scannedDataRowCount=" + scannedDataRowCount +
+ ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+ ", before=" + before +
+ ", after=" + after +
+ '}';
+ }
+
+ public long getScannedDataRowCount() {
+ return scannedDataRowCount;
+ }
+
+ public long getRebuiltIndexRowCount() {
+ return rebuiltIndexRowCount;
+ }
+
+ public long getBeforeRebuildValidIndexRowCount() {
+ return before.validIndexRowCount;
+ }
+
+ public long getBeforeRebuildExpiredIndexRowCount() {
+ return before.expiredIndexRowCount;
+ }
+
+ public long getBeforeRebuildInvalidIndexRowCount() {
+ return before.invalidIndexRowCount;
+ }
+
+ public long getBeforeRebuildMissingIndexRowCount() {
+ return before.missingIndexRowCount;
+ }
+
+ public long getAfterRebuildValidIndexRowCount() {
+ return after.validIndexRowCount;
+ }
+
+ public long getAfterRebuildExpiredIndexRowCount() {
+ return after.expiredIndexRowCount;
+ }
+
+ public long getAfterRebuildInvalidIndexRowCount() {
+ return after.invalidIndexRowCount;
+ }
+
+ public long getAfterRebuildMissingIndexRowCount() {
+ return after.missingIndexRowCount;
+ }
+
+ private void addScannedDataRowCount(long count) {
+ this.scannedDataRowCount += count;
+ }
+
+ private void addRebuiltIndexRowCount(long count) {
+ this.rebuiltIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildValidIndexRowCount(long count) {
+ before.validIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildExpiredIndexRowCount(long count) {
+ before.expiredIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildMissingIndexRowCount(long count) {
+ before.missingIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildInvalidIndexRowCount(long count) {
+ before.invalidIndexRowCount += count;
+ }
+
+ private void addAfterRebuildValidIndexRowCount(long count) {
+ after.validIndexRowCount += count;
+ }
+
+ private void addAfterRebuildExpiredIndexRowCount(long count) {
+ after.expiredIndexRowCount += count;
+ }
+
+ private void addAfterRebuildMissingIndexRowCount(long count) {
+ after.missingIndexRowCount += count;
+ }
+
+ private void addAfterRebuildInvalidIndexRowCount(long count) {
+ after.invalidIndexRowCount += count;
+ }
+
+ private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ private long getValue(Cell cell) {
+ return Long.parseLong(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private void update(Cell cell) {
+ if (CellUtil
+ .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+ addScannedDataRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+ addRebuiltIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildInvalidIndexRowCount(getValue(cell));
+ }
+ }
+
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
+ throws IOException {
+ IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
+ byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+ byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+ Scan scan = new Scan();
+ scan.setStartRow(startRowKey);
+ scan.setStopRow(stopRowKey);
+ ResultScanner scanner = hTable.getScanner(scan);
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ for (Cell cell : result.rawCells()) {
+ verificationResult.update(cell);
+ }
+ }
+ return verificationResult;
+ }
+
+ public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+ return false;
+ } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+ return true;
+ }
+ } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+ if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void add(IndexToolVerificationResult verificationResult) {
+ scannedDataRowCount += verificationResult.scannedDataRowCount;
+ rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+ before.add(verificationResult.before);
+ after.add(verificationResult.after);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 566fb59..0180368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1112,9 +1112,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
-
- RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
- return scanner;
+ if (!scan.isRaw()) {
+ Scan rawScan = new Scan(scan);
+ rawScan.setRaw(true);
+ rawScan.setMaxVersions();
+ rawScan.getFamilyMap().clear();
+ rawScan.setFilter(null);
+ for (byte[] family : scan.getFamilyMap().keySet()) {
+ rawScan.addFamily(family);
+ }
+ innerScanner.close();
+ RegionScanner scanner = region.getScanner(rawScan);
+ return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
+ }
+ return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
}
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index c4ae93d..7bfefb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,13 +40,16 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,36 +59,49 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.LockManager.RowLock;
import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
-import com.google.common.collect.Lists;
+import java.util.Set;
+
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
+import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
/**
* Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
@@ -159,14 +176,19 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
private boolean rebuild;
+ // The current and next states of the data rows corresponding to the pending mutations
+ private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
+ // Data table pending mutations
+ private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
+
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
}
}
-
+
private ThreadLocal<BatchMutateContext> batchMutateContext =
new ThreadLocal<BatchMutateContext>();
-
+
/** Configuration key for the {@link IndexBuilder} to use */
public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
@@ -176,17 +198,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
*/
public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
- private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
-
public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
- private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
- private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
- private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
- private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
@@ -349,13 +365,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
public static long getMaxTimestamp(Mutation m) {
long maxTs = 0;
- long ts = 0;
- Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
- Iterator<Cell> cellIterator = entry.getValue().iterator();
- while (cellIterator.hasNext()) {
- Cell cell = cellIterator.next();
+ long ts;
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
ts = cell.getTimestamp();
if (ts > maxTs) {
maxTs = ts;
@@ -409,316 +421,550 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
}
}
- private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
- long now, ReplayWrite replayWrite) throws IOException {
- Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
- boolean copyMutations = false;
- for (int i = 0; i < miniBatchOp.size(); i++) {
- if (miniBatchOp.getOperationStatus(i) == IGNORE) {
- continue;
- }
- Mutation m = miniBatchOp.getOperation(i);
- if (this.builder.isEnabled(m)) {
- // Track whether or not we need to
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- if (mutationsMap.containsKey(row)) {
- copyMutations = true;
- } else {
- mutationsMap.put(row, null);
- }
- }
- }
- // early exit if it turns out we don't have any edits
- if (mutationsMap.isEmpty()) {
- return null;
- }
- // If we're copying the mutations
- Collection<Mutation> originalMutations;
- Collection<? extends Mutation> mutations;
- if (copyMutations) {
- originalMutations = null;
- mutations = mutationsMap.values();
- } else {
- originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
- mutations = originalMutations;
- }
+ private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context) throws IOException {
+ context.multiMutationMap = new HashMap<>();
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ // skip this mutation if we aren't enabling indexing
+ // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+ // should be indexed, which means we need to expose another method on the builder. Such is the
+ // way optimization go though.
+ if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ MultiMutation stored = context.multiMutationMap.get(row);
+ if (stored == null) {
+ // we haven't seen this row before, so add it
+ stored = new MultiMutation(row);
+ context.multiMutationMap.put(row, stored);
+ }
+ stored.addAll(m);
+ }
+ }
+ return context.multiMutationMap.values();
+ }
- boolean resetTimeStamp = replayWrite == null;
+ public static void setTimestamp(Mutation m, long ts) throws IOException {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ CellUtil.setTimestamp(cell, ts);
+ }
+ }
+ }
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- // skip this mutation if we aren't enabling indexing
- // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
- // should be indexed, which means we need to expose another method on the builder. Such is the
- // way optimization go though.
- if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
- if (resetTimeStamp) {
- // Unless we're replaying edits to rebuild the index, we update the time stamp
- // of the data table to prevent overlapping time stamps (which prevents index
- // inconsistencies as this case isn't handled correctly currently).
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- CellUtil.setTimestamp(cell, now);
- }
- }
- }
- // No need to write the table mutations when we're rebuilding
- // the index as they're already written and just being replayed.
- if (replayWrite == ReplayWrite.INDEX_ONLY
- || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
- miniBatchOp.setOperationStatus(i, NOWRITE);
- }
+ /**
+ * This method applies pending delete mutations on the next row states
+ */
+ private void applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context) throws IOException {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ if (!(m instanceof Delete)) {
+ continue;
+ }
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+ if (dataRowState == null) {
+ dataRowState = new Pair<Put, Put>(null, null);
+ context.dataRowStates.put(rowKeyPtr, dataRowState);
+ }
+ Put nextDataRowState = dataRowState.getSecond();
+ if (nextDataRowState == null) {
+ if (dataRowState.getFirst() == null) {
+ // This is a delete row mutation on a non-existing row. There is no need to apply this mutation
+ // on the data table
+ miniBatchOp.setOperationStatus(i, NOWRITE);
+ }
+ continue;
+ }
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ switch (KeyValue.Type.codeToType(cell.getTypeByte())) {
+ case DeleteFamily:
+ case DeleteFamilyVersion:
+ nextDataRowState.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+ break;
+ case DeleteColumn:
+ case Delete:
+ removeColumn(nextDataRowState, cell);
+ }
+ }
+ }
+ if (nextDataRowState != null && nextDataRowState.getFamilyCellMap().size() == 0) {
+ dataRowState.setSecond(null);
+ }
+ }
+ }
- // Only copy mutations if we found duplicate rows
- // which only occurs when we're partially rebuilding
- // the index (since we'll potentially have both a
- // Put and a Delete mutation for the same row).
- if (copyMutations) {
- // Add the mutation to the batch set
-
- ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
- MultiMutation stored = mutationsMap.get(row);
- // we haven't seen this row before, so add it
- if (stored == null) {
- stored = new MultiMutation(row);
- mutationsMap.put(row, stored);
- }
- stored.addAll(m);
- } else {
- originalMutations.add(m);
- }
- }
- }
+ /**
+ * This method applies the pending put mutations on the the next row states.
+ * Before this method is called, the next row states is set to current row states.
+ */
+ private void applyPendingPutMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context, long now) throws IOException {
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ // skip this mutation if we aren't enabling indexing
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ // Unless we're replaying edits to rebuild the index, we update the time stamp
+ // of the data table to prevent overlapping time stamps (which prevents index
+ // inconsistencies as this case isn't handled correctly currently).
+ setTimestamp(m, now);
+ if (m instanceof Put) {
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+ Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
+ if (dataRowState == null) {
+ dataRowState = new Pair<Put, Put>(null, null);
+ context.dataRowStates.put(rowKeyPtr, dataRowState);
+ }
+ Put nextDataRowState = dataRowState.getSecond();
+ dataRowState.setSecond((nextDataRowState != null) ? applyNew((Put) m, nextDataRowState) : new Put((Put) m));
+ }
+ }
+ }
- if (copyMutations || replayWrite != null) {
- mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
- }
- return mutations;
- }
+ /**
+ * * Prepares data row current and next row states
+ */
+ private void prepareDataRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ long now) throws IOException {
+ if (context.rowsToLock.size() == 0) {
+ return;
+ }
+ // Retrieve the current row states from the data table
+ getCurrentRowStates(c, context);
+ applyPendingPutMutations(miniBatchOp, context, now);
+ applyPendingDeleteMutations(miniBatchOp, context);
+ }
- public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
- List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
- if (cellList == null) {
- return;
- }
- Iterator<Cell> cellIterator = cellList.iterator();
- while (cellIterator.hasNext()) {
- Cell cell = cellIterator.next();
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- emptyCQ, 0, emptyCQ.length) == 0) {
- cellIterator.remove();
- return;
- }
- }
- }
+ public static void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+ List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ emptyCQ, 0, emptyCQ.length) == 0) {
+ cellIterator.remove();
+ return;
+ }
+ }
+ }
- private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp,
- ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
- byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
- HTableInterfaceReference hTableInterfaceReference =
- new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
- List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
- if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
- return;
- }
- List<Mutation> localUpdates = new ArrayList<Mutation>();
- Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
- while (indexUpdatesItr.hasNext()) {
- Pair<Mutation, byte[]> next = indexUpdatesItr.next();
- localUpdates.add(next.getFirst());
- }
- if (!localUpdates.isEmpty()) {
- miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
- }
- }
+ /**
+ * The index update generation for local indexes uses the existing index update generation code (i.e.,
+ * the {@link IndexBuilder} implementation).
+ */
+ private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ Collection<? extends Mutation> pendingMutations,
+ PhoenixIndexMetaData indexMetaData) throws Throwable {
+ ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+ this.builder.getIndexUpdates(indexUpdates, miniBatchOp, pendingMutations, indexMetaData);
+ byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+ List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+ if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+ return;
+ }
+ List<Mutation> localUpdates = new ArrayList<Mutation>();
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+ while (indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ localUpdates.add(next.getFirst());
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+ }
+ /**
+ * Retrieve the the last committed data row state. This method is called only for regular data mutations since for
+ * rebuild (i.e., index replay) mutations include all row versions.
+ */
+
+ private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+ BatchMutateContext context) throws IOException {
+ Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+ for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+ }
+ Scan scan = new Scan();
+ ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
+ scanRanges.initializeScan(scan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ scan.setFilter(skipScanFilter);
+ context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
+ try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
+ boolean more = true;
+ while(more) {
+ List<Cell> cells = new ArrayList<Cell>();
+ more = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+ Put put = new Put(rowKey);
+ for (Cell cell : cells) {
+ put.add(cell);
+ }
+ context.dataRowStates.put(new ImmutableBytesPtr(rowKey), new Pair<Put, Put>(put, new Put(put)));
+ }
+ }
+ }
- private void prepareIndexMutations(
- ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp,
- BatchMutateContext context,
- Collection<? extends Mutation> mutations,
- long now,
- PhoenixIndexMetaData indexMetaData) throws Throwable {
- List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
- // get the current span, or just use a null-span to avoid a bunch of if statements
- try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
- Span current = scope.getSpan();
- if (current == null) {
- current = NullSpan.INSTANCE;
- }
- // get the index updates for all elements in this batch
- context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
- this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData);
- current.addTimelineAnnotation("Built index updates, doing preStep");
- handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
- context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
- int updateCount = 0;
- for (IndexMaintainer indexMaintainer : maintainers) {
- updateCount++;
- byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
- byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
- HTableInterfaceReference hTableInterfaceReference =
- new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
- for (Pair<Mutation, byte[]> update : updates) {
- // add the VERIFIED cell, which is the empty cell
- Mutation m = update.getFirst();
- if (context.rebuild) {
- if (m instanceof Put) {
- long ts = getMaxTimestamp(m);
- // Remove the empty column prepared by Index codec as we need to change its value
- removeEmptyColumn(m, emptyCF, emptyCQ);
- ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
- }
- context.preIndexUpdates.put(hTableInterfaceReference, m);
- } else {
- if (m instanceof Put) {
- // Remove the empty column prepared by Index codec as we need to change its value
- removeEmptyColumn(m, emptyCF, emptyCQ);
- // Set the status of the index row to "unverified"
- ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
- // This will be done before the data table row is updated (i.e., in the first write phase)
- context.preIndexUpdates.put(hTableInterfaceReference, m);
- }
- else {
- // Set the status of the index row to "unverified"
- Put unverifiedPut = new Put(m.getRow());
- unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
- // This will be done before the data table row is updated (i.e., in the first write phase)
- context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
- }
- }
- }
- }
- TracingUtils.addAnnotation(current, "index update count", updateCount);
- }
- }
+ /**
+ * Generate the index update for a data row from the mutation that are obtained by merging the previous data row
+ * state with the pending row mutation.
+ */
+ private void prepareIndexMutations(BatchMutateContext context, List<IndexMaintainer> maintainers, long ts)
+ throws IOException {
+ List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new ArrayList<>(maintainers.size());
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ if (indexMaintainer.isLocalIndex()) {
+ continue;
+ }
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference));
+ }
+ for (Map.Entry<ImmutableBytesPtr, Pair<Put, Put>> entry : context.dataRowStates.entrySet()) {
+ ImmutableBytesPtr rowKeyPtr = entry.getKey();
+ Pair<Put, Put> dataRowState = entry.getValue();
+ Put currentDataRowState = dataRowState.getFirst();
+ Put nextDataRowState = dataRowState.getSecond();
+ if (currentDataRowState == null && nextDataRowState == null) {
+ continue;
+ }
+ for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) {
+ IndexMaintainer indexMaintainer = pair.getFirst();
+ HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
+ if (nextDataRowState != null) {
+ ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+ Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ nextDataRowVG, rowKeyPtr, ts, null, null);
+ if (indexPut == null) {
+ // No covered column. Just prepare an index row with the empty column
+ byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
+ null, null, HConstants.LATEST_TIMESTAMP);
+ indexPut = new Put(indexRowKey);
+ } else {
+ removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier());
+ }
+ indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts, UNVERIFIED_BYTES);
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
+ // Delete the current index row if the new index key is different than the current one
+ if (currentDataRowState != null) {
+ ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+ byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+ null, null, HConstants.LATEST_TIMESTAMP);
+ if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+ }
+ }
+ } else if (currentDataRowState != null) {
+ ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+ byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
+ null, null, HConstants.LATEST_TIMESTAMP);
+ Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
+ IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
+ context.indexUpdates.put(hTableInterfaceReference,
+ new Pair<Mutation, byte[]>(del, rowKeyPtr.get()));
+ }
+ }
+ }
+ }
- protected PhoenixIndexMetaData getPhoenixIndexMetaData(
- ObserverContext<RegionCoprocessorEnvironment> observerContext,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
- if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
- throw new DoNotRetryIOException(
- "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
- ", current table is:" +
- observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
- }
- return (PhoenixIndexMetaData)indexMetaData;
- }
+ /**
+ * This method prepares unverified index mutations which are applied to index tables before the data table is
+ * updated. In the three phase update approach, in phase 1, the status of existing index rows is set to "unverified"
+ * (these rows will be deleted from the index table in phase 3), and/or new put mutations are added with the
+ * unverified status. In phase 2, data table mutations are applied. In phase 3, the status for an index table row is
+ * either set to "verified" or the row is deleted.
+ */
+ private void preparePreIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ Collection<? extends Mutation> pendingMutations,
+ long now,
+ PhoenixIndexMetaData indexMetaData) throws Throwable {
+ List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+ current.addTimelineAnnotation("Built index updates, doing preStep");
+ // Handle local index updates
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ if (indexMaintainer.isLocalIndex()) {
+ handleLocalIndexUpdates(c, miniBatchOp, pendingMutations, indexMetaData);
+ break;
+ }
+ }
+ // The rest of this method is for handling global index updates
+ context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+ prepareIndexMutations(context, maintainers, now);
+
+ context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ int updateCount = 0;
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ updateCount++;
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+ for (Pair<Mutation, byte[]> update : updates) {
+ Mutation m = update.getFirst();
+ if (m instanceof Put) {
+ // This will be done before the data table row is updated (i.e., in the first write phase)
+ context.preIndexUpdates.put(hTableInterfaceReference, m);
+ } else {
+ // Set the status of the index row to "unverified"
+ Put unverifiedPut = new Put(m.getRow());
+ unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+ // This will be done before the data table row is updated (i.e., in the first write phase)
+ context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
+ }
+ }
+ }
+ TracingUtils.addAnnotation(current, "index update count", updateCount);
+ }
+ }
- private void preparePostIndexMutations(
- BatchMutateContext context,
- long now,
- PhoenixIndexMetaData indexMetaData) throws Throwable {
- context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
- List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
- // Check if we need to skip post index update for any of the rows
- for (IndexMaintainer indexMaintainer : maintainers) {
- byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
- byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
- HTableInterfaceReference hTableInterfaceReference =
- new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
- for (Pair<Mutation, byte[]> update : updates) {
- // Are there concurrent updates on the data table row? if so, skip post index updates
- // and let read repair resolve conflicts
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
- PendingRow pendingRow = pendingRows.get(rowKey);
- if (!pendingRow.isConcurrent()) {
- Mutation m = update.getFirst();
- if (m instanceof Put) {
- Put verifiedPut = new Put(m.getRow());
- // Set the status of the index row to "verified"
- verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
- context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
- }
- else {
- context.postIndexUpdates.put(hTableInterfaceReference, m);
- }
+ protected PhoenixIndexMetaData getPhoenixIndexMetaData(ObserverContext<RegionCoprocessorEnvironment> observerContext,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp)
+ throws IOException {
+ IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+ if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+ throw new DoNotRetryIOException(
+ "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
+ ", current table is:" +
+ observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ return (PhoenixIndexMetaData)indexMetaData;
+ }
- }
- }
- }
- // We are done with handling concurrent mutations. So we can remove the rows of this batch from
- // the collection of pending rows
- removePendingRows(context);
- context.indexUpdates.clear();
- }
+ /**
+ * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are
+ * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which
+ * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id))
+ * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the
+ * pending data table mutation pointed by multiMutation.
+ */
+ private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) {
+ Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
+ for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) {
+ byte[] family = columnReference.getFamily();
+ List<Cell> cellList = familyMap.get(family);
+ if (cellList == null) {
+ return false;
+ }
+ boolean has = false;
+ for (Cell cell : cellList) {
+ if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) {
+ has = true;
+ break;
+ }
+ }
+ if (!has) {
+ return false;
+ }
+ }
+ return true;
+ }
- public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
- ignoreAtomicOperations(miniBatchOp);
- PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
- BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
- setBatchMutateContext(c, context);
- Mutation firstMutation = miniBatchOp.getOperation(0);
- ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
- context.rebuild = replayWrite != null;
- /*
- * Exclusively lock all rows so we get a consistent read
- * while determining the index updates
- */
- long now;
- if (!context.rebuild) {
- populateRowsToLock(miniBatchOp, context);
- lockRows(context);
- now = EnvironmentEdgeManager.currentTimeMillis();
- // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
- // concurrent updates
- populatePendingRows(context);
- }
- else {
- now = EnvironmentEdgeManager.currentTimeMillis();
- }
- // First group all the updates for a single row into a single update to be processed
- Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
- // early exit if it turns out we don't have any edits
- if (mutations == null) {
- return;
- }
+ private void preparePostIndexMutations(BatchMutateContext context, long now, PhoenixIndexMetaData indexMetaData,
+ String tableName)
+ throws Throwable {
+ context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+ // Check if we need to skip post index update for any of the rows
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+ for (Pair<Mutation, byte[]> update : updates) {
+ // Are there concurrent updates on the data table row? if so, skip post index updates
+ // and let read repair resolve conflicts
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (!pendingRow.isConcurrent()) {
+ Mutation m = update.getFirst();
+ if (m instanceof Put) {
+ Put verifiedPut = new Put(m.getRow());
+ // Set the status of the index row to "verified"
+ verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+ context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
+ } else {
+ context.postIndexUpdates.put(hTableInterfaceReference, m);
+ }
+ } else {
+ if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) {
+ // This batch needs to be retried since one of the concurrent mutations does not have the value
+ // for an indexed column. Not including an index column may lead to incorrect index row key
+ // generation for concurrent mutations since concurrent mutations are not serialized entirely
+ // and do not see each other's effect on data table. Throwing an IOException will result in
+ // retries of this batch. Before throwing exception, we need to remove reference counts and
+ // locks for the rows of this batch
+ removePendingRows(context);
+ context.indexUpdates.clear();
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ context.rowLocks.clear();
+ throw new IOException("One of the concurrent mutations does not have all indexed columns. " +
+ "The batch needs to be retried " + tableName);
+ }
+ }
+ }
+ }
- long start = EnvironmentEdgeManager.currentTimeMillis();
- prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
- metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
-
- // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
- // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
- // can be prepared in less than one millisecond
- if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
- Thread.sleep(1);
- LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
- }
- // Release the locks before making RPC calls for index updates
- for (RowLock rowLock : context.rowLocks) {
- rowLock.release();
- }
- // Do the first phase index updates
- doPre(c, context, miniBatchOp);
- if (!context.rebuild) {
- // Acquire the locks again before letting the region proceed with data table updates
- List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
- for (RowLock rowLock : context.rowLocks) {
- rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
- }
- context.rowLocks.clear();
- context.rowLocks = rowLocks;
- preparePostIndexMutations(context, now, indexMetaData);
- }
- if (failDataTableUpdatesForTesting) {
- throw new DoNotRetryIOException("Simulating the data table write failure");
- }
- }
+ // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+ // the collection of pending rows
+ removePendingRows(context);
+ context.indexUpdates.clear();
+ }
+
+ /**
+ * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations
+ * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called
+ * only for global indexes.
+ */
+ private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context,
+ IndexMaintainer indexMaintainer) throws Throwable {
+ Put put = null;
+ List <Mutation> indexMutations = new ArrayList<>();
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+ continue;
+ }
+ Mutation m = miniBatchOp.getOperation(i);
+ if (!this.builder.isEnabled(m)) {
+ continue;
+ }
+ if (m instanceof Put) {
+ if (put != null) {
+ indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+ }
+ put = (Put)m;
+ } else {
+ indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m));
+ put = null;
+ }
+ miniBatchOp.setOperationStatus(i, NOWRITE);
+ }
+ if (put != null) {
+ indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
+ }
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ for (Mutation m : indexMutations) {
+ context.preIndexUpdates.put(hTableInterfaceReference, m);
+ }
+ doPre(c, context, miniBatchOp);
+ // For rebuild updates, no post index update is prepared. Just create an empty list.
+ context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ }
+
+ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+ ignoreAtomicOperations(miniBatchOp);
+ PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
+ BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
+ setBatchMutateContext(c, context);
+ Mutation firstMutation = miniBatchOp.getOperation(0);
+ ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+ context.rebuild = replayWrite != null;
+ if (context.rebuild) {
+ preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0));
+ return;
+ }
+ /*
+ * Exclusively lock all rows so we get a consistent read
+ * while determining the index updates
+ */
+ long now;
+ populateRowsToLock(miniBatchOp, context);
+ lockRows(context);
+ now = EnvironmentEdgeManager.currentTimeMillis();
+ // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+ // concurrent updates
+ populatePendingRows(context);
+ // Prepare current and next data rows states for pending mutations (for global indexes)
+ prepareDataRowStates(c, miniBatchOp, context, now);
+ // Group all the updates for a single row into a single update to be processed (for local indexes)
+ Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
+ // early exit if it turns out we don't have any edits
+ if (mutations == null || mutations.isEmpty()) {
+ return;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ preparePreIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
+ metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
+ // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
+ // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
+ // can be prepared in less than one millisecond
+ if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
+ Thread.sleep(1);
+ LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ }
+ // Release the locks before making RPC calls for index updates
+ for (RowLock rowLock : context.rowLocks) {
+ rowLock.release();
+ }
+ // Do the first phase index updates
+ doPre(c, context, miniBatchOp);
+ // Acquire the locks again before letting the region proceed with data table updates
+ List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+ for (RowLock rowLock : context.rowLocks) {
+ rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
+ }
+ context.rowLocks.clear();
+ context.rowLocks = rowLocks;
+ preparePostIndexMutations(context, now, indexMetaData,
+ c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+ if (failDataTableUpdatesForTesting) {
+ throw new DoNotRetryIOException("Simulating the data table write failure");
+ }
+ }
private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
this.batchMutateContext.set(context);
}
-
+
private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
return this.batchMutateContext.get();
}
-
+
private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
this.batchMutateContext.remove();
}
@@ -828,14 +1074,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
}
/**
- * Exposed for testing!
- * @return the currently instantiated index builder
- */
- public IndexBuilder getBuilderForTesting() {
- return this.builder.getBuilderForTesting();
- }
-
- /**
* Enable indexing on the given table
* @param descBuilder {@link TableDescriptor} for the table on which indexing should be enabled
* @param builder class to use when building the index for this table
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 8c9a2d9..7004c81 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -1620,7 +1620,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
*/
private void initCachedState() {
byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
- dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+ dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, emptyKvQualifier);
this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size());
// columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 98000f7..8d1b4db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -63,8 +64,8 @@ public class PhoenixIndexImportDirectReducer extends
long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
- IndexRebuildRegionScanner.VerificationResult verificationResult =
- IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+ IndexToolVerificationResult verificationResult =
+ IndexToolVerificationResult.getVerificationResult(hTable, ts);
context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
setValue(verificationResult.getScannedDataRowCount());
context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
new file mode 100644
index 0000000..56ec027
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
@@ -0,0 +1,732 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.index;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class PrepareIndexMutationsForRebuildTest extends BaseConnectionlessQueryTest {
+ private static String ROW_KEY = "k1";
+ private static String TABLE_NAME = "dataTable";
+ private static String INDEX_NAME = "idx";
+
+ class SetupInfo {
+ public IndexMaintainer indexMaintainer;
+ public PTable pDataTable;
+ }
+
+ /**
+ * Get the index maintainer and phoenix table definition of data table.
+ * @param tableName
+ * @param indexName
+ * @param columns
+ * @param indexColumns
+ * @param pk
+ * @param includeColumns
+ * @return
+ * @throws Exception
+ */
+ private SetupInfo setup(String tableName,
+ String indexName,
+ String columns,
+ String indexColumns,
+ String pk,
+ String includeColumns) throws Exception {
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+
+ String fullTableName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(""), SchemaUtil.normalizeIdentifier(tableName));
+ String fullIndexName = SchemaUtil.getTableName(SchemaUtil.normalizeIdentifier(""), SchemaUtil.normalizeIdentifier(indexName));
+
+ // construct the data table and index based from the parameters
+ String str1 = String.format("CREATE TABLE %1$s (%2$s CONSTRAINT pk PRIMARY KEY (%3$s)) COLUMN_ENCODED_BYTES=0",
+ fullTableName,
+ columns,
+ pk);
+ conn.createStatement().execute(str1);
+
+ String str2 = String.format("CREATE INDEX %1$s ON %2$s (%3$s)",
+ fullIndexName,
+ fullTableName,
+ indexColumns);
+ if (!includeColumns.isEmpty())
+ str2 += " INCLUDE (" + includeColumns + ")";
+ conn.createStatement().execute(str2);
+
+ // Get the data table, index table and index maintainer reference from the client's ConnectionQueryServiceImpl
+ // In this way, we don't need to setup a local cluster.
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ PTable pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+ PTable pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+ IndexMaintainer im = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+
+ SetupInfo info = new SetupInfo();
+ info.indexMaintainer = im;
+ info.pDataTable = pDataTable;
+ return info;
+ }
+ }
+
+ /**
+ * Simulate one put mutation on the indexed column
+ * @throws Exception
+ */
+ @Test
+ public void testSinglePutOnIndexColumn() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ // insert a row
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ null);
+
+ // Expect one row of index with row key "v1_k1"
+ Put idxPut1 = new Put(generateIndexRowKey("v1"));
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+ assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+ }
+
+ /**
+ * Simulate one put mutation on the non-indexed column
+ * @throws Exception
+ */
+ @Test
+ public void testSinglePutOnNonIndexColumn() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ null);
+
+ // Expect one row of index with row key "_k1", as indexed column C1 is nullable.
+ Put idxPut1 = new Put(generateIndexRowKey(null));
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+ assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+ }
+
+ /**
+ * Simulate the column delete on the index column
+ * @throws Exception
+ */
+ @Test
+ public void testDelOnIndexColumn() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ // insert the row for deletion
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ // only delete the value of column C1
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 2,
+ KeyValue.Type.DeleteColumn);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutation = new ArrayList<>();
+
+ // generate the index row key "v1_k1"
+ byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+ Put idxPut1 = new Put(idxKeyBytes);
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutation.add(idxPut1);
+
+ // generate the index row key "_k1"
+ Put idxPut2 = new Put(generateIndexRowKey(null));
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+ expectedIndexMutation.add(idxPut2);
+
+ // This deletion is to remove the row added by the idxPut1, as idxPut2 has different row key as idxPut1.
+ // Otherwise the row "v1_k1" will still be shown in the scan result
+ Delete idxDel = new Delete(idxKeyBytes);
+ addCellToDelMutation(idxDel,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ null,
+ 2,
+ KeyValue.Type.DeleteFamily);
+ expectedIndexMutation.add(idxDel);
+
+ assertEqualMutationList(expectedIndexMutation, actualIndexMutations);
+ }
+
+ /**
+ * Simulate the column delete on the non-indexed column
+ * @throws Exception
+ */
+ @Test
+ public void testDelOnNonIndexColumn() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ // insert the row for deletion
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ // delete the value of column C2
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 2,
+ KeyValue.Type.DeleteColumn);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+ byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+ // idxPut1 is the corresponding index mutation of dataPut
+ Put idxPut1 = new Put(idxKeyBytes);
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutations.add(idxPut1);
+
+ // idxPut2 is required to update the timestamp, so the index row will have the same life time as its corresponding data row.
+ // No delete mutation is expected on index table, as data mutation happens only on non-indexed column.
+ Put idxPut2 = new Put(idxKeyBytes);
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+ expectedIndexMutations.add(idxPut2);
+
+ assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+ }
+
+ /**
+ * Simulate the data deletion of all version on the indexed row
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteAllVersions() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ // insert two versions for a single row
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 2,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
+
+ // DeleteFamily will delete all versions of the columns in that family
+ // Since C1 is the only column of the default column family, so deleting the default family removes all version
+ // of column C1
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ null,
+ 3,
+ KeyValue.Type.DeleteFamily);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+ byte[] idxKeyBytes1 = generateIndexRowKey("v1");
+ byte[] idxKeyBytes2 = generateIndexRowKey("v2");
+
+ // idxPut1 and idxPut2 are generated by two versions in dataPut
+ Put idxPut1 = new Put(idxKeyBytes1);
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutations.add(idxPut1);
+
+ Put idxPut2 = new Put(idxKeyBytes2);
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+ expectedIndexMutations.add(idxPut2);
+
+ // idxDel1 is required to remove the row key "v1_k1" which is added by idxPut1.
+ // The ts of idxDel1 is same as idxPut2, because it is a result of idxPut2.
+ // Since C1 is the only index column, so it is translated to DeleteFamily mutation.
+ Delete idxDel1 = new Delete(idxKeyBytes1);
+ addCellToDelMutation(idxDel1,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ null,
+ 2,
+ KeyValue.Type.DeleteFamily);
+ expectedIndexMutations.add(idxDel1);
+
+ // idxDel2 is corresponding index mutation of dataDel
+ Delete idxDel2 = new Delete(idxKeyBytes2);
+ addCellToDelMutation(idxDel2,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ null,
+ 3,
+ KeyValue.Type.DeleteFamily);
+ expectedIndexMutations.add(idxDel2);
+
+ assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+ }
+
+ // Simulate the put and delete mutation with the same time stamp on the index
+ @Test
+ public void testPutDeleteOnSameTimeStamp() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ // insert a row
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable,1);
+
+ // delete column of C1 from the inserted row
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ KeyValue.Type.DeleteColumn);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutations = new ArrayList<>();
+
+ // The dataDel will be applied on top of dataPut when we replay them for index rebuild, when they have the same time stamp.
+ // idxPut1 is expected as in data table we still see the row of k1 with empty C1, so we need a row in index table with row key "_k1"
+ Put idxPut1 = new Put(generateIndexRowKey(null));
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutations.add(idxPut1);
+
+ assertEqualMutationList(Arrays.asList((Mutation)idxPut1), actualIndexMutations);
+ }
+
+ // Simulate the put and delete mutation on the covered column of data table
+ @Test
+ public void testCoveredIndexColumns() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "C2");
+
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C1"),
+ 2,
+ KeyValue.Type.DeleteColumn);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutations = new ArrayList<>();
+ byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+ // idxPut1 is generated corresponding to dataPut.
+ // The column "0:C2" is generated from data table column family and column name, its family name is still default family name of index table
+ Put idxPut1 = new Put(idxKeyBytes);
+ addCellToPutMutation(idxPut1,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ Bytes.toBytes("0:C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutations.add(idxPut1);
+
+ // idxKey2 is required by dataDel, as dataDel change the corresponding row key of index table
+ List<Byte> idxKey2 = new ArrayList<>();
+ idxKey2.add(QueryConstants.SEPARATOR_BYTE);
+ idxKey2.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(ROW_KEY)));
+ byte[] idxKeyBytes2 = com.google.common.primitives.Bytes.toArray(idxKey2);
+ Put idxPut2 = new Put(idxKeyBytes2);
+ addCellToPutMutation(idxPut2,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ Bytes.toBytes("0:C2"),
+ 2,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+ expectedIndexMutations.add(idxPut2);
+
+ // idxDel is required to invalid the index row "v1_k1", dataDel removed the value of indexed column
+ Delete idxDel = new Delete(idxKeyBytes);
+ addCellToDelMutation(idxDel,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ null,
+ 2,
+ KeyValue.Type.DeleteFamily);
+ expectedIndexMutations.add(idxDel);
+
+ assertEqualMutationList(expectedIndexMutations, actualIndexMutations);
+ }
+
+ // Simulate the scenario that index column, and covered column belong to different column families
+ @Test
+ public void testForMultipleFamilies() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, CF1.C1 VARCHAR, CF2.C2 VARCHAR", //define C1 and C2 with different families
+ "CF1.C1",
+ "ROW_KEY",
+ "CF2.C2");
+
+ // insert a row to the data table
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ Bytes.toBytes("CF1"),
+ Bytes.toBytes("C1"),
+ 1,
+ Bytes.toBytes("v1"));
+ addCellToPutMutation(dataPut,
+ Bytes.toBytes("CF2"),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+
+ // delete the indexed column CF1:C1
+ Delete dataDel = new Delete(Bytes.toBytes(ROW_KEY));
+ addCellToDelMutation(dataDel,
+ Bytes.toBytes("CF1"),
+ Bytes.toBytes("C1"),
+ 2,
+ KeyValue.Type.DeleteColumn);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ dataDel);
+
+ List<Mutation> expectedIndexMutation = new ArrayList<>();
+
+ byte[] idxKeyBytes = generateIndexRowKey("v1");
+
+ // index table will use the family name of the first covered column, which is CF2 here.
+ Put idxPut1 = new Put(idxKeyBytes);
+ addCellToPutMutation(idxPut1,
+ Bytes.toBytes("CF2"),
+ Bytes.toBytes("CF2:C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+ expectedIndexMutation.add(idxPut1);
+
+ // idxPut2 and idxDel are the result of dataDel
+ // idxPut2 is to create the index row "_k1", idxDel is to invalid the index row "v1_k1".
+ Put idxPut2 = new Put(generateIndexRowKey(null));
+ addCellToPutMutation(idxPut2,
+ Bytes.toBytes("CF2"),
+ Bytes.toBytes("CF2:C2"),
+ 2,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+ expectedIndexMutation.add(idxPut2);
+
+ Delete idxDel = new Delete(idxKeyBytes);
+ addCellToDelMutation(idxDel,
+ Bytes.toBytes("CF2"),
+ null,
+ 2,
+ KeyValue.Type.DeleteFamily);
+ expectedIndexMutation.add(idxDel);
+
+ assertEqualMutationList(expectedIndexMutation, actualIndexMutations);
+ }
+
+ // Simulate two data put with the same value but different time stamp.
+ // We expect to see 2 index mutations with same value but different time stamps.
+ @Test
+ public void testSameTypeOfMutationWithSameValueButDifferentTimeStamp() throws Exception {
+ SetupInfo info = setup(TABLE_NAME,
+ INDEX_NAME,
+ "ROW_KEY VARCHAR, C1 VARCHAR, C2 VARCHAR",
+ "C1",
+ "ROW_KEY",
+ "");
+
+ Put dataPut = new Put(Bytes.toBytes(ROW_KEY));
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v2"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
+ addCellToPutMutation(dataPut,
+ info.indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ Bytes.toBytes("C2"),
+ 1,
+ Bytes.toBytes("v3"));
+ addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 2);
+
+ List<Mutation> actualIndexMutations = IndexRebuildRegionScanner.prepareIndexMutationsForRebuild(info.indexMaintainer,
+ dataPut,
+ null);
+
+ byte[] idxKeyBytes = generateIndexRowKey(null);
+
+ // idxPut1 and idxPut2 have same value but different time stamp
+ Put idxPut1 = new Put(idxKeyBytes);
+ addEmptyColumnToIndexPutMutation(idxPut1, info.indexMaintainer, 1);
+
+ Put idxPut2 = new Put(idxKeyBytes);
+ addEmptyColumnToIndexPutMutation(idxPut2, info.indexMaintainer, 2);
+
+ assertEqualMutationList(Arrays.asList((Mutation)idxPut1, (Mutation)idxPut2), actualIndexMutations);
+ }
+
+ /**
+ * Generate the row key for index table by the value of indexed column
+ * @param indexVal
+ * @return
+ */
+ byte[] generateIndexRowKey(String indexVal) {
+ List<Byte> idxKey = new ArrayList<>();
+ if (indexVal != null && !indexVal.isEmpty())
+ idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(indexVal)));
+ idxKey.add(QueryConstants.SEPARATOR_BYTE);
+ idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(ROW_KEY)));
+ return com.google.common.primitives.Bytes.toArray(idxKey);
+ }
+
+ void addCellToPutMutation(Put put, byte[] family, byte[] column, long ts, byte[] value) throws Exception {
+ byte[] rowKey = put.getRow();
+ Cell cell = CellUtil.createCell(rowKey, family, column, ts, KeyValue.Type.Put.getCode(), value);
+ put.add(cell);
+ }
+
+ void addCellToDelMutation(Delete del, byte[] family, byte[] column, long ts, KeyValue.Type type) throws Exception {
+ byte[] rowKey = del.getRow();
+ Cell cell = CellUtil.createCell(rowKey, family, column, ts, type.getCode(), null);
+ del.addDeleteMarker(cell);
+ }
+
+ /**
+ * Add Empty column to the existing data put mutation
+ * @param put
+ * @param ptable
+ * @param ts
+ * @throws Exception
+ */
+ void addEmptyColumnToDataPutMutation(Put put, PTable ptable, long ts) throws Exception {
+ addCellToPutMutation(put,
+ SchemaUtil.getEmptyColumnFamily(ptable),
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ ts,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ }
+
+ /**
+ * Add the verified flag to the existing index put mutation
+ * @param put
+ * @param im
+ * @param ts
+ * @throws Exception
+ */
+ void addEmptyColumnToIndexPutMutation(Put put, IndexMaintainer im, long ts) throws Exception {
+ addCellToPutMutation(put,
+ im.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ ts,
+ IndexRegionObserver.VERIFIED_BYTES);
+ }
+
+ /**
+ * Compare two mutation lists without worrying about the order of the mutations in the lists
+ * @param expectedMutations
+ * @param actualMutations
+ */
+ void assertEqualMutationList(List<Mutation> expectedMutations,
+ List<Mutation> actualMutations) {
+ assertEquals(expectedMutations.size(), actualMutations.size());
+ for (Mutation expected : expectedMutations) {
+ boolean found = false;
+ for (Mutation actual: actualMutations) {
+ if (isEqualMutation(expected, actual)) {
+ actualMutations.remove(actual);
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ Assert.fail(String.format("Cannot find mutation:%s", expected));
+ }
+ }
+
+ /**
+ * Compare two mutations without worrying about the order of cells within each mutation
+ * @param expectedMutation
+ * @param actualMutation
+ * @return
+ */
+ boolean isEqualMutation(Mutation expectedMutation, Mutation actualMutation){
+ List<Cell> expectedCells = new ArrayList<>();
+ for (List<Cell> cells : expectedMutation.getFamilyCellMap().values()) {
+ expectedCells.addAll(cells);
+ }
+
+ List<Cell> actualCells = new ArrayList<>();
+ for (List<Cell> cells : actualMutation.getFamilyCellMap().values()) {
+ actualCells.addAll(cells);
+ }
+
+ if (expectedCells.size() != actualCells.size())
+ return false;
+ for(Cell expected : expectedCells) {
+ boolean found = false;
+ for(Cell actual: actualCells){
+ if (isEqualCell(expected, actual)) {
+ actualCells.remove(actual);
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ return false;
+ }
+
+ return true;
+ }
+
+ boolean isEqualCell(Cell a, Cell b) {
+ return CellUtil.matchingRow(a, b)
+ && CellUtil.matchingFamily(a, b)
+ && CellUtil.matchingQualifier(a, b)
+ && CellUtil.matchingTimestamp(a, b)
+ && CellUtil.matchingType(a, b)
+ && CellUtil.matchingValue(a, b);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
new file mode 100644
index 0000000..fbb022d
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
+
+ private static final int INDEX_TABLE_EXPIRY_SEC = 1;
+ private static final String UNEXPECTED_COLUMN = "0:UNEXPECTED_COLUMN";
+ public static final String FIRST_ID = "FIRST_ID";
+ public static final String SECOND_ID = "SECOND_ID";
+ public static final String FIRST_VALUE = "FIRST_VALUE";
+ public static final String SECOND_VALUE = "SECOND_VALUE";
+ public static final String
+ CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (FIRST_ID BIGINT NOT NULL, "
+ + "SECOND_ID BIGINT NOT NULL, FIRST_VALUE VARCHAR(20), "
+ + "SECOND_VALUE INTEGER "
+ + "CONSTRAINT PK PRIMARY KEY(FIRST_ID, SECOND_ID)) COLUMN_ENCODED_BYTES=0";
+
+ public static final String
+ CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (SECOND_VALUE) INCLUDE (FIRST_VALUE)";
+ public static final String COMPLETE_ROW_UPSERT = "UPSERT INTO %s VALUES (?,?,?,?)";
+ public static final String PARTIAL_ROW_UPSERT = "UPSERT INTO %s (%s, %s, %s) VALUES (?,?,?)";
+ public static final String DELETE_ROW_DML = "DELETE FROM %s WHERE %s = ? AND %s = ?";
+ public static final String INCLUDED_COLUMN = "0:FIRST_VALUE";
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ private enum TestType {
+ //set of mutations matching expected mutations
+ VALID_EXACT_MATCH,
+ //mix of delete and put mutations
+ VALID_MIX_MUTATIONS,
+ //only incoming unverified mutations
+ VALID_NEW_UNVERIFIED_MUTATIONS,
+ //extra mutations mimicking incoming mutations
+ VALID_MORE_MUTATIONS,
+ EXPIRED,
+ INVALID_EXTRA_CELL,
+ INVALID_EMPTY_CELL,
+ INVALID_CELL_VALUE,
+ INVALID_COLUMN
+ }
+
+ public static class UnitTestClock extends EnvironmentEdge {
+ long initialTime;
+ long delta;
+
+ public UnitTestClock(long delta) {
+ initialTime = System.currentTimeMillis() + delta;
+ this.delta = delta;
+ }
+
+ @Override
+ public long currentTime() {
+ return System.currentTimeMillis() + delta;
+ }
+ }
+
+ @Mock
+ Result indexRow;
+ @Mock
+ IndexRebuildRegionScanner rebuildScanner;
+ List<Mutation> actualMutationList;
+ String schema, table, dataTableFullName, index, indexTableFullName;
+ PTable pIndexTable, pDataTable;
+ Put put = null;
+ Delete delete = null;
+ PhoenixConnection pconn;
+ IndexToolVerificationResult.PhaseResult actualPR;
+ public Map<byte[], List<Mutation>> indexKeyToMutationMapLocal;
+ private IndexMaintainer indexMaintainer;
+
+ @Before
+ public void setup() throws SQLException, IOException {
+ MockitoAnnotations.initMocks(this);
+ createDBObject();
+ createMutationsWithUpserts();
+ initializeRebuildScannerAttributes();
+ initializeGlobalMockitoSetup();
+ }
+
+ public void createDBObject() throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+ schema = generateUniqueName();
+ table = generateUniqueName();
+ index = generateUniqueName();
+ dataTableFullName = SchemaUtil.getQualifiedTableName(schema, table);
+ indexTableFullName = SchemaUtil.getQualifiedTableName(schema, index);
+
+ conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+ conn.createStatement().execute(String.format(CREATE_INDEX_DDL, index, dataTableFullName));
+ conn.commit();
+
+ pconn = conn.unwrap(PhoenixConnection.class);
+ pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), indexTableFullName));
+ pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), dataTableFullName));
+ }
+ }
+
+ private void createMutationsWithUpserts() throws SQLException, IOException {
+ deleteRow(2, 3);
+ upsertPartialRow(2, 3, "abc");
+ upsertCompleteRow(2, 3, "hik", 8);
+ upsertPartialRow(2, 3, 10);
+ upsertPartialRow(2,3,4);
+ deleteRow(2, 3);
+ upsertPartialRow(2,3, "def");
+ upsertCompleteRow(2, 3, null, 20);
+ upsertPartialRow(2,3, "wert");
+ }
+
+ private void deleteRow(int key1, int key2) throws SQLException, IOException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(DELETE_ROW_DML, dataTableFullName, FIRST_ID, SECOND_ID));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertPartialRow(int key1, int key2, String val1)
+ throws SQLException, IOException {
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+ FIRST_VALUE));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setString(3, val1);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertPartialRow(int key1, int key2, int value1)
+ throws SQLException, IOException {
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement
+ ps =
+ conn.prepareStatement(
+ String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+ SECOND_VALUE));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setInt(3, value1);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertCompleteRow(int key1, int key2, String val1
+ , int val2) throws SQLException, IOException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+ PreparedStatement
+ ps = conn.prepareStatement(String.format(COMPLETE_ROW_UPSERT, dataTableFullName));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setString(3, val1);
+ ps.setInt(4, val2);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void convertUpsertToMutations(Connection conn) throws SQLException, IOException {
+ Iterator<Pair<byte[],List<Cell>>>
+ dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn);
+ Pair<byte[], List<Cell>> elem = dataTableNameAndMutationKeyValuesIter.next();
+ byte[] key = CellUtil.cloneRow(elem.getSecond().get(0));
+ long mutationTS = EnvironmentEdgeManager.currentTimeMillis();
+
+ for (Cell kv : elem.getSecond()) {
+ Cell cell =
+ CellUtil.createCell(CellUtil.cloneRow(kv), CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ mutationTS, kv.getTypeByte(), CellUtil.cloneValue(kv));
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null ) {
+ put = new Put(key);
+ }
+ put.add(cell);
+ } else {
+ if (delete == null) {
+ delete = new Delete(key);
+ }
+ delete.addDeleteMarker(cell);
+ }
+ }
+ }
+
+ private void initializeRebuildScannerAttributes() {
+ when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
+ when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
+ when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+ rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
+ indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+ rebuildScanner.setIndexMaintainer(indexMaintainer);
+ }
+
+ private void initializeGlobalMockitoSetup() throws IOException {
+ //setup
+ when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+ when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
+ when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
+ Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();
+ doNothing().when(rebuildScanner)
+ .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+ Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+ Matchers.<byte[]>any(), Matchers.<byte[]>any());
+ doNothing().when(rebuildScanner)
+ .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+ Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString());
+
+ //populate the local map to use to create actual mutations
+ indexKeyToMutationMapLocal = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMapLocal);
+ rebuildScanner.prepareIndexMutations(put, delete);
+
+ //populate map to use in test code
+ Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+ rebuildScanner.prepareIndexMutations(put, delete);
+ }
+
+ private byte[] getValidRowKey() {
+ return indexKeyToMutationMapLocal.entrySet().iterator().next().getKey();
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_validIndexRowCount_nonZero() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_validIndexRowCount_moreActual() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_allMix() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_allUnverified() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
+ IndexToolVerificationResult.PhaseResult
+ expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.EXPIRED);
+ expireThisRow();
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_diffColumn() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_expectedMutations_null() throws IOException {
+ when(indexRow.getRow()).thenReturn(Bytes.toBytes(1));
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
+ byte [] validRowKey = getValidRowKey();
+ when(indexRow.getRow()).thenReturn(validRowKey);
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
+ byte [] validRowKey = getValidRowKey();
+ when(indexRow.getRow()).thenReturn(validRowKey);
+ actualMutationList = new ArrayList<>();
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
+ return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+ }
+
+ private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
+ return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+ }
+
+ private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
+ TestType testType)
+ throws IOException {
+ actualPR = new IndexToolVerificationResult.PhaseResult();
+ byte[] indexKey = entry.getKey();
+ when(indexRow.getRow()).thenReturn(indexKey);
+ actualMutationList = buildActualIndexMutationsList(testType);
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+ }
+
+ private List<Mutation> buildActualIndexMutationsList(TestType testType) {
+ List<Mutation> actualMutations = new ArrayList<>();
+ actualMutations.addAll(indexKeyToMutationMapLocal.get(indexRow.getRow()));
+ if(testType.equals(TestType.EXPIRED)) {
+ return actualMutations;
+ }
+ if(testType.toString().startsWith("VALID")) {
+ return getValidActualMutations(testType, actualMutations);
+ }
+ if(testType.toString().startsWith("INVALID")) {
+ return getInvalidActualMutations(testType, actualMutations);
+ }
+ return null;
+ }
+
+ private List <Mutation> getValidActualMutations(TestType testType,
+ List<Mutation> actualMutations) {
+ List <Mutation> newActualMutations = new ArrayList<>();
+ if(testType.equals(TestType.VALID_EXACT_MATCH)) {
+ return actualMutations;
+ }
+ if (testType.equals(TestType.VALID_MIX_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ }
+ if (testType.equals(TestType.VALID_NEW_UNVERIFIED_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+ }
+ newActualMutations.addAll(actualMutations);
+ if(testType.equals(TestType.VALID_MORE_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+ }
+ return newActualMutations;
+ }
+
+ private List <Mutation> getInvalidActualMutations(TestType testType,
+ List<Mutation> actualMutations) {
+ List <Mutation> newActualMutations = new ArrayList<>();
+ newActualMutations.addAll(actualMutations);
+ for (Mutation m : actualMutations) {
+ newActualMutations.remove(m);
+ NavigableMap<byte[], List<Cell>> familyCellMap = m.getFamilyCellMap();
+ List<Cell> cellList = familyCellMap.firstEntry().getValue();
+ List<Cell> newCellList = new ArrayList<>();
+ byte[] fam = CellUtil.cloneFamily(cellList.get(0));
+ for (Cell c : cellList) {
+ infiltrateCell(c, newCellList, testType);
+ }
+ familyCellMap.put(fam, newCellList);
+ m.setFamilyCellMap(familyCellMap);
+ newActualMutations.add(m);
+ }
+ return newActualMutations;
+ }
+
+ private void infiltrateCell(Cell c, List<Cell> newCellList, TestType e) {
+ Cell newCell;
+ Cell emptyCell;
+ switch(e) {
+ case INVALID_COLUMN:
+ newCell =
+ CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ Bytes.toBytes(UNEXPECTED_COLUMN),
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
+ newCellList.add(newCell);
+ newCellList.add(c);
+ break;
+ case INVALID_CELL_VALUE:
+ if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+ newCell = getCellWithPut(c);
+ emptyCell = getVerifiedEmptyCell(c);
+ newCellList.add(newCell);
+ newCellList.add(emptyCell);
+ } else {
+ newCellList.add(c);
+ }
+ break;
+ case INVALID_EMPTY_CELL:
+ if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+ newCell =
+ CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ CellUtil.cloneQualifier(c), c.getTimestamp(),
+ KeyValue.Type.Delete.getCode(), VERIFIED_BYTES);
+ newCellList.add(newCell);
+ } else {
+ newCellList.add(c);
+ }
+ break;
+ case INVALID_EXTRA_CELL:
+ newCell = getCellWithPut(c);
+ emptyCell = getVerifiedEmptyCell(c);
+ newCellList.add(newCell);
+ newCellList.add(emptyCell);
+ newCellList.add(c);
+ }
+ }
+
+ private Cell getVerifiedEmptyCell(Cell c) {
+ return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ indexMaintainer.getEmptyKeyValueQualifier(),
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
+ }
+
+ private Cell getCellWithPut(Cell c) {
+ return CellUtil.createCell(CellUtil.cloneRow(c),
+ CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
+ c.getTimestamp(), KeyValue.Type.Put.getCode(),
+ Bytes.toBytes("zxcv"));
+ }
+
+ private void expireThisRow() {
+ rebuildScanner.setIndexTableTTL(INDEX_TABLE_EXPIRY_SEC);
+ UnitTestClock expiryClock = new UnitTestClock(5000);
+ EnvironmentEdgeManager.injectEdge(expiryClock);
+ }
+
+ private Mutation getDeleteMutation(Mutation orig, Long ts) {
+ Mutation m = new Delete(orig.getRow());
+ List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+ ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+ Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.DeleteFamilyVersion);
+ Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, true);
+ byte[] fam = CellUtil.cloneFamily(origList.get(0));
+ List<Cell> famCells = Lists.newArrayList();
+ m.getFamilyCellMap().put(fam, famCells);
+ famCells.add(c);
+ famCells.add(empty);
+ return m;
+ }
+
+ private Mutation getUnverifiedPutMutation(Mutation orig, Long ts) {
+ Mutation m = new Put(orig.getRow());
+ if (orig.getAttributesMap() != null) {
+ for (Map.Entry<String,byte[]> entry : orig.getAttributesMap().entrySet()) {
+ m.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+ List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+ ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+ Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.Put);
+ Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, false);
+ byte[] fam = CellUtil.cloneFamily(origList.get(0));
+ List<Cell> famCells = Lists.newArrayList();
+ m.getFamilyCellMap().put(fam, famCells);
+ famCells.add(c);
+ famCells.add(empty);
+ return m;
+ }
+
+ private Cell getEmptyCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type,
+ boolean verified) {
+ return CellUtil.createCell(orig.getRow(), CellUtil.cloneFamily(origList.get(0)),
+ indexMaintainer.getEmptyKeyValueQualifier(),
+ ts, type.getCode(), verified ? VERIFIED_BYTES : UNVERIFIED_BYTES);
+ }
+
+ private Cell getNewPutCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type) {
+ return CellUtil.createCell(orig.getRow(),
+ CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN),
+ ts, type.getCode(), Bytes.toBytes("asdfg"));
+ }
+}