You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/27 09:14:36 UTC
[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5694 Add MR job
counters for IndexTool inline verification
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
new 7cd4f85 PHOENIX-5694 Add MR job counters for IndexTool inline verification
7cd4f85 is described below
commit 7cd4f85eebee81abf3fe26acd97d12b47b12f981
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Jan 23 18:50:39 2020 -0800
PHOENIX-5694 Add MR job counters for IndexTool inline verification
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 137 +++--
.../phoenix/compile/ServerBuildIndexCompiler.java | 24 +-
.../coprocessor/IndexRebuildRegionScanner.java | 608 +++++++++++++++------
.../PhoenixServerBuildIndexInputFormat.java | 2 +
.../apache/phoenix/mapreduce/index/IndexTool.java | 59 +-
.../index/PhoenixIndexImportDirectReducer.java | 81 ++-
.../index/PhoenixIndexToolJobCounters.java | 35 ++
.../java/org/apache/phoenix/util/IndexUtil.java | 2 +-
8 files changed, 707 insertions(+), 241 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index b3b1613..8c3e89b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,28 +17,8 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -87,8 +67,39 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
@@ -179,8 +190,8 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
@Test
public void testWithSetNull() throws Exception {
- // This test is for building non-transactional mutable global indexes with direct api
- if (localIndex || transactional || !mutable || !directApi || useSnapshot) {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
// This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
@@ -322,10 +333,20 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
}
}
+ private void dropIndexToolTables(Connection conn) throws Exception {
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolResultTable);
+ admin.deleteTable(indexToolResultTable);
+ }
+
@Test
public void testBuildSecondaryIndexAndScrutinize() throws Exception {
// This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot) {
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
@@ -356,8 +377,14 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
conn.createStatement().execute(stmtString2);
// Run the index MR job and verify that the index table is built correctly
- IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
@@ -366,10 +393,21 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
upsertRow(stmt1, i);
}
conn.commit();
- indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(2 * NROWS, actualRowCount);
+ dropIndexToolTables(conn);
}
}
@@ -402,8 +440,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
boolean dataTableNameCheck = false;
boolean indexTableNameCheck = false;
Cell errorMessageCell = null;
- Result result = scanner.next();
- if (result != null) {
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
@@ -431,7 +468,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
@Test
public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
// This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot) {
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -468,17 +505,14 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BOTH);
assertEquals(0, MutationCountingRegionObserver.getMutationCount());
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
+ dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolVerifyAfterOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot) {
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -512,17 +546,14 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
+ dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolOnlyVerifyOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot) {
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
@@ -543,16 +574,13 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
// IndexTool will go through each data table row and record the mismatches in the output table
// called PHOENIX_INDEX_TOOL
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.ONLY);
+ null, -1, IndexTool.IndexVerifyType.ONLY);
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
// Delete the output table for the next test
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
+ dropIndexToolTables(conn);
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
@@ -561,19 +589,19 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
conn.commit();
// Run the index tool using the only-verify option to detect this mismatch between the data and index table
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.ONLY);
+ null, -1, IndexTool.IndexVerifyType.ONLY);
cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
+ dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
- if (localIndex || transactional || !directApi || useSnapshot) {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
@@ -592,7 +620,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
conn.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
indexTableName, dataTableFullName));
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, -1,
IndexTool.IndexVerifyType.ONLY);
Cell cell =
getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
@@ -632,8 +660,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
.getTable(indexToolOutputTable.getName());
Result r = hIndexToolTable.getScanner(scan).next();
assertTrue(r == null);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
+ dropIndexToolTables(conn);
}
}
@@ -724,7 +751,7 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
@Test
public void testSecondaryGlobalIndexFailure() throws Exception {
// This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot) {
+ if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 1b482aa..4392e23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -35,9 +35,11 @@ import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
/**
@@ -94,6 +96,16 @@ public class ServerBuildIndexCompiler {
throw new IllegalArgumentException(
"ServerBuildIndexCompiler does not support global indexes on transactional tables");
}
+ // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+ // However, in this case, we need to project all of the data columns that contribute to the index.
+ IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+ for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+ if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+ scan.addFamily(columnRef.getFamily());
+ } else {
+ scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+ }
+ }
IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
// Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
// For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
@@ -109,18 +121,8 @@ public class ServerBuildIndexCompiler {
scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+ addEmptyColumnToScan(scan, indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
}
- // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
- // However, in this case, we need to project all of the data columns that contribute to the index.
- IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
- for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
- if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- scan.addFamily(columnRef.getFamily());
- } else {
- scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
- }
- }
-
if (dataTable.isTransactional()) {
scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 527d78c..318440f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,6 +20,17 @@ package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -30,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
@@ -90,6 +103,238 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
+
+ public static class VerificationResult {
+ public static class PhaseResult {
+ private long validIndexRowCount = 0;
+ private long expiredIndexRowCount = 0;
+ private long missingIndexRowCount = 0;
+ private long invalidIndexRowCount = 0;
+
+ public void add(PhaseResult phaseResult) {
+ validIndexRowCount += phaseResult.validIndexRowCount;
+ expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+ missingIndexRowCount += phaseResult.missingIndexRowCount;
+ invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+ }
+
+ public long getTotalCount() {
+ return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+ }
+
+ @Override
+ public String toString() {
+ return "PhaseResult{" +
+ "validIndexRowCount=" + validIndexRowCount +
+ ", expiredIndexRowCount=" + expiredIndexRowCount +
+ ", missingIndexRowCount=" + missingIndexRowCount +
+ ", invalidIndexRowCount=" + invalidIndexRowCount +
+ '}';
+ }
+ }
+
+ private long scannedDataRowCount = 0;
+ private long rebuiltIndexRowCount = 0;
+ private PhaseResult before = new PhaseResult();
+ private PhaseResult after = new PhaseResult();
+
+ @Override
+ public String toString() {
+ return "VerificationResult{" +
+ "scannedDataRowCount=" + scannedDataRowCount +
+ ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+ ", before=" + before +
+ ", after=" + after +
+ '}';
+ }
+
+ public long getScannedDataRowCount() {
+ return scannedDataRowCount;
+ }
+
+ public long getRebuiltIndexRowCount() {
+ return rebuiltIndexRowCount;
+ }
+
+ public long getBeforeRebuildValidIndexRowCount() {
+ return before.validIndexRowCount;
+ }
+
+ public long getBeforeRebuildExpiredIndexRowCount() {
+ return before.expiredIndexRowCount;
+ }
+
+ public long getBeforeRebuildInvalidIndexRowCount() {
+ return before.invalidIndexRowCount;
+ }
+
+ public long getBeforeRebuildMissingIndexRowCount() {
+ return before.missingIndexRowCount;
+ }
+
+ public long getAfterRebuildValidIndexRowCount() {
+ return after.validIndexRowCount;
+ }
+
+ public long getAfterRebuildExpiredIndexRowCount() {
+ return after.expiredIndexRowCount;
+ }
+
+ public long getAfterRebuildInvalidIndexRowCount() {
+ return after.invalidIndexRowCount;
+ }
+
+ public long getAfterRebuildMissingIndexRowCount() {
+ return after.missingIndexRowCount;
+ }
+
+ private void addScannedDataRowCount(long count) {
+ this.scannedDataRowCount += count;
+ }
+
+ private void addRebuiltIndexRowCount(long count) {
+ this.rebuiltIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildValidIndexRowCount(long count) {
+ before.validIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildExpiredIndexRowCount(long count) {
+ before.expiredIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildMissingIndexRowCount(long count) {
+ before.missingIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildInvalidIndexRowCount(long count) {
+ before.invalidIndexRowCount += count;
+ }
+
+ private void addAfterRebuildValidIndexRowCount(long count) {
+ after.validIndexRowCount += count;
+ }
+
+ private void addAfterRebuildExpiredIndexRowCount(long count) {
+ after.expiredIndexRowCount += count;
+ }
+
+ private void addAfterRebuildMissingIndexRowCount(long count) {
+ after.missingIndexRowCount += count;
+ }
+
+ private void addAfterRebuildInvalidIndexRowCount(long count) {
+ after.invalidIndexRowCount += count;
+ }
+
+ private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ private long getValue(Cell cell) {
+ return Long.parseLong(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private void update(Cell cell) {
+ if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+ addScannedDataRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+ addRebuiltIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildInvalidIndexRowCount(getValue(cell));
+ }
+ }
+
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ public static VerificationResult getVerificationResult(Table hTable, long ts)
+ throws IOException {
+ VerificationResult verificationResult = new VerificationResult();
+ byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+ byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+ Scan scan = new Scan();
+ scan.setStartRow(startRowKey);
+ scan.setStopRow(stopRowKey);
+ ResultScanner scanner = hTable.getScanner(scan);
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ for (Cell cell : result.rawCells()) {
+ verificationResult.update(cell);
+ }
+ }
+ return verificationResult;
+ }
+
+ public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+ return false;
+ }
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
+ return true;
+ }
+ }
+ if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+ if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+ return true;
+ }
+ if (before.validIndexRowCount + before.expiredIndexRowCount +
+ after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void add(VerificationResult verificationResult) {
+ scannedDataRowCount += verificationResult.scannedDataRowCount;
+ rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+ before.add(verificationResult.before);
+ after.add(verificationResult.after);
+ }
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
@@ -112,9 +357,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private byte[] indexRowKey = null;
private Table indexHTable = null;
private Table outputHTable = null;
+ private Table resultHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
- private boolean doNotFail = false;
private Map<byte[], Put> indexKeyToDataPutMap;
private Map<byte[], Put> dataKeyToDataPutMap;
private TaskRunner pool;
@@ -124,6 +369,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private RegionCoprocessorEnvironment env;
private HTableFactory hTableFactory;
private int indexTableTTL;
+ private VerificationResult verificationResult;
+ private boolean isBeforeRebuilt = true;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
@@ -158,6 +405,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
+ verificationResult = new VerificationResult();
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
@@ -165,7 +413,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
- outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+ outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+ resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
@@ -173,7 +422,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
- tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
@@ -188,14 +436,58 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
@Override
public boolean isFilterDone() { return hasMore; }
+ private void logToIndexToolResultTable() throws IOException {
+ long scanMaxTs = scan.getTimeRange().getMax();
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
+ byte[] regionName = Bytes.toBytes(region.getRegionInfo().getRegionNameAsString());
+ byte[] rowKey = new byte[keyPrefix.length + regionName.length];
+ // The row key for the result table is the max timestamp of the scan + the table region name
+ Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+ Bytes.putBytes(rowKey, keyPrefix.length, regionName, 0, regionName.length);
+ Put put = new Put(rowKey);
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, IndexTool.SCAN_STOP_ROW_KEY_BYTES,
+ scanMaxTs, scan.getStopRow());
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
+ verifyType == IndexTool.IndexVerifyType.ONLY) {
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
+ }
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
+ }
+ resultHTable.put(put);
+ }
+
@Override
public void close() throws IOException {
innerScanner.close();
if (verify) {
- this.pool.stop("IndexRebuildRegionScanner is closing");
- hTableFactory.shutdown();
- indexHTable.close();
- outputHTable.close();
+ try {
+ logToIndexToolResultTable();
+ } finally {
+ this.pool.stop("IndexRebuildRegionScanner is closing");
+ hTableFactory.shutdown();
+ indexHTable.close();
+ outputHTable.close();
+ resultHTable.close();
+ }
}
}
@@ -213,7 +505,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
int cellCount = put.size();
if (cellCount == allColumns.size() + 1) {
- // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+ // We have all the columns for the index table. So, no delete marker is needed
return null;
}
Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
@@ -241,14 +533,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return del;
}
- private void addToBeVerifiedIndexRows() throws IOException {
- for (Mutation mutation : mutations) {
- if (mutation instanceof Put) {
- indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
- }
- }
- }
-
private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
ungroupedAggregateRegionObserver.checkForRegionClosing();
@@ -300,69 +584,72 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg) {
+ String errorMsg) throws IOException {
logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, null, null);
}
private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg, byte[] expectedValue, byte[] actualValue) {
+ String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
final int PREFIX_LENGTH = 3;
final int TOTAL_PREFIX_LENGTH = 6;
+ final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
+ final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
+ long scanMaxTs = scan.getTimeRange().getMax();
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
+ byte[] rowKey;
+ // The row key for the output table is the max timestamp of the scan + data row key
+ if (dataRowKey != null) {
+ rowKey = new byte[keyPrefix.length + dataRowKey.length];
+ Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+ Bytes.putBytes(rowKey, keyPrefix.length, dataRowKey, 0, dataRowKey.length);
+ } else {
+ rowKey = new byte[keyPrefix.length];
+ Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
+ }
+ Put put = new Put(rowKey);
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
+ scanMaxTs, region.getRegionInfo().getTable().getName());
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
+ scanMaxTs, indexMaintainer.getIndexTableName());
+ if (dataRowKey != null) {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
+ }
+ if (indexRowKey != null) {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+ scanMaxTs, indexRowKey);
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
+ }
+ byte[] errorMessageBytes;
+ if (expectedValue != null) {
+ errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+ TOTAL_PREFIX_LENGTH];
+ Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+ int length = errorMsg.length();
+ Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+ length += expectedValue.length;
+ Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
- try {
- int longLength = Long.SIZE / Byte.SIZE;
- byte[] rowKey;
- if (dataRowKey != null) {
- rowKey = new byte[longLength + dataRowKey.length];
- Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
- Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
- } else {
- rowKey = new byte[longLength];
- Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
- }
- Put put = new Put(rowKey);
- long scanMaxTs = scan.getTimeRange().getMax();
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
- scanMaxTs, region.getRegionInfo().getTable().getName());
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
- scanMaxTs, indexMaintainer.getIndexTableName());
- if (dataRowKey != null) {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(dataRowTs));
- }
- if (indexRowKey != null) {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
- scanMaxTs, indexRowKey);
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(indexRowTs));
- }
- byte[] errorMessageBytes;
- if (expectedValue != null) {
- errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
- TOTAL_PREFIX_LENGTH];
- Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
- int length = errorMsg.length();
- Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
- length += PREFIX_LENGTH;
- Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
- length += expectedValue.length;
- Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
- length += PREFIX_LENGTH;
- Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
-
- }
- else {
- errorMessageBytes = Bytes.toBytes(errorMsg);
- }
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
- outputHTable.put(put);
- } catch (IOException e) {
- exceptionMessage = "LogToIndexToolOutputTable failed " + e;
}
+ else {
+ errorMessageBytes = Bytes.toBytes(errorMsg);
+ }
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+ if (isBeforeRebuilt) {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
+ } else {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
+ }
+ outputHTable.put(put);
}
private long getMaxTimestamp(Result result) {
@@ -396,19 +683,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
if (indexPut == null) {
- // This means the index row does not have any covered columns. We just need to check if the index row
- // has only one cell (which is the empty column cell)
- if (indexRow.rawCells().length == 1) {
- return true;
- }
- String errorMsg = "Expected to find only empty column cell but got "
- + indexRow.rawCells().length;
+ String errorMsg = "Empty index update";
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (doNotFail) {
- return false;
- }
- exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
- throw new IOException(exceptionMessage);
+ return false;
}
else {
// Remove the empty column prepared by Index codec as we need to change its value
@@ -440,11 +717,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (doNotFail) {
- return false;
- }
- exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
- throw new IOException(exceptionMessage);
+ return false;
}
// Check all columns
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
@@ -452,11 +725,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
- if (doNotFail) {
- return false;
- }
- exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
- throw new IOException(exceptionMessage);
+ return false;
+ } else if (!CellUtil.matchingTimestamp(actualCell, expectedCell)) {
+ String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
+ Bytes.toString(qualifier) + " E: " + expectedCell.getTimestamp() + " A: " +
+ actualCell.getTimestamp();
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+ errorMsg, null, null);
+ return false;
}
cellCount++;
}
@@ -465,16 +741,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
String errorMsg = "Expected to find " + cellCount + " cells but got "
+ indexRow.rawCells().length + " cells";
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (!doNotFail) {
- exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
- throw new IOException(exceptionMessage);
- }
return false;
}
return true;
}
- private void verifyIndexRows(ArrayList<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap) throws IOException {
+ private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
+ VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
int expectedRowCount = keys.size();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
@@ -487,15 +760,17 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
Put dataPut = indexKeyToDataPutMap.get(result.getRow());
if (dataPut == null) {
+ // This should never happen
String errorMsg = "Missing data row";
logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
- if (!doNotFail) {
- exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
- throw new IOException(exceptionMessage);
- }
+ exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
}
if (verifySingleIndexRow(result, dataPut)) {
+ verificationPhaseResult.validIndexRowCount++;
perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+ } else {
+ verificationPhaseResult.invalidIndexRowCount++;
}
rowCount++;
}
@@ -513,6 +788,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (isTimestampBeforeTTL(currentTime, ts)) {
itr.remove();
rowCount++;
+ verificationPhaseResult.expiredIndexRowCount++;
}
}
}
@@ -521,11 +797,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
String errorMsg = "Missing index row";
logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
0, errorMsg);
- if (!doNotFail) {
- exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
- throw new IOException(exceptionMessage);
- }
}
+ verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
}
}
@@ -536,29 +809,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
}
- private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
- byte[] uuidValue = ServerCacheClient.generateId();
- UngroupedAggregateRegionObserver.MutationList currentMutationList =
- new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
- for (Mutation mutation : mutationList) {
- Put put = (Put) mutation;
- currentMutationList.add(mutation);
- setMutationAttributes(put, uuidValue);
- uuidValue = commitIfReady(uuidValue, currentMutationList);
- Delete deleteMarkers = generateDeleteMarkers(put);
- if (deleteMarkers != null) {
- setMutationAttributes(deleteMarkers, uuidValue);
- currentMutationList.add(deleteMarkers);
- uuidValue = commitIfReady(uuidValue, currentMutationList);
- }
- }
- if (!currentMutationList.isEmpty()) {
- ungroupedAggregateRegionObserver.checkForRegionClosing();
- ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
- }
- }
-
- private void addVerifyTask(final ArrayList<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap) {
+ private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
+ final VerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -567,13 +819,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
- verifyIndexRows(keys, perTaskDataKeyToDataPutMap);
- if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
- synchronized (dataKeyToDataPutMap) {
- dataKeyToDataPutMap.putAll(perTaskDataKeyToDataPutMap);
- }
- }
- perTaskDataKeyToDataPutMap.clear();
+ verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
} catch (Exception e) {
throw e;
}
@@ -582,21 +828,33 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
});
}
- private void parallelizeIndexVerify() throws IOException {
- addToBeVerifiedIndexRows();
- ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+ private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ for (Mutation mutation : mutations) {
+ indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
+ }
+ int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+ tasks = new TaskBatch<>(taskCount);
+ List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+ List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+ List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ dataPutMapList.add(perTaskDataKeyToDataPutMap);
+ VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ verificationPhaseResultList.add(perTaskVerificationPhaseResult);
for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
if (keys.size() == rowCountPerTask) {
- addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
keys = new ArrayList<>(rowCountPerTask);
perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ dataPutMapList.add(perTaskDataKeyToDataPutMap);
+ perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ verificationPhaseResultList.add(perTaskVerificationPhaseResult);
}
}
if (keys.size() > 0) {
- addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
}
List<Boolean> taskResultList = null;
try {
@@ -606,8 +864,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
- } finally {
- tasks.getTasks().clear();
}
for (Boolean result : taskResultList) {
if (result == null) {
@@ -615,21 +871,61 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
throw new IOException(exceptionMessage);
}
}
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ for (Map<byte[], Put> dataPutMap : dataPutMapList) {
+ dataKeyToDataPutMap.putAll(dataPutMap);
+ }
+ }
+ for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
+ verificationPhaseResult.add(result);
+ }
+ }
+
+ private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ UngroupedAggregateRegionObserver.MutationList currentMutationList =
+ new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+ for (Mutation mutation : mutationList) {
+ Put put = (Put) mutation;
+ currentMutationList.add(mutation);
+ setMutationAttributes(put, uuidValue);
+ uuidValue = commitIfReady(uuidValue, currentMutationList);
+ Delete deleteMarkers = generateDeleteMarkers(put);
+ if (deleteMarkers != null) {
+ setMutationAttributes(deleteMarkers, uuidValue);
+ currentMutationList.add(deleteMarkers);
+ uuidValue = commitIfReady(uuidValue, currentMutationList);
+ }
+ }
+ if (!currentMutationList.isEmpty()) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
+ }
}
private void verifyAndOrRebuildIndex() throws IOException {
+ VerificationResult nextVerificationResult = new VerificationResult();
+ nextVerificationResult.scannedDataRowCount = mutations.size();
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
rebuildIndexRows(mutations);
+ nextVerificationResult.rebuiltIndexRowCount = mutations.size();
+ isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.NONE) {
return;
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
+ VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
// For these options we start with verifying index rows
- doNotFail = true; // Don't stop at the first mismatch
- parallelizeIndexVerify();
+ parallelizeIndexVerify(verificationPhaseResult);
+ nextVerificationResult.before.add(verificationPhaseResult);
+ if (mutations.size() != verificationPhaseResult.getTotalCount()) {
+ throw new DoNotRetryIOException(
+ "mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
+ nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
+ }
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt and now need to rebuild them
@@ -639,15 +935,24 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
mutations.add(entry.getValue());
}
rebuildIndexRows(mutations);
+ nextVerificationResult.rebuiltIndexRowCount += mutations.size();
+ isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
- doNotFail = false; // Stop at the first mismatch
indexKeyToDataPutMap.clear();
- parallelizeIndexVerify();
+ VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+ parallelizeIndexVerify(verificationPhaseResult);
+ nextVerificationResult.after.add(verificationPhaseResult);
+ if (mutations.size() != verificationPhaseResult.getTotalCount()) {
+ throw new DoNotRetryIOException(
+ "mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
+ nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
+ }
}
indexKeyToDataPutMap.clear();
+ verificationResult.add(nextVerificationResult);
}
@Override
@@ -713,18 +1018,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
rowCount++;
}
} while (hasMore && rowCount < pageSizeInRows);
- }
- if (!partialRebuild && indexRowKey == null) {
- verifyAndOrRebuildIndex();
- }
- else {
- if (!mutations.isEmpty()) {
- ungroupedAggregateRegionObserver.checkForRegionClosing();
- ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ if (!partialRebuild && indexRowKey == null) {
+ verifyAndOrRebuildIndex();
+ } else {
+ if (!mutations.isEmpty()) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ }
}
}
} catch (IOException e) {
- hasMore = false;
LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
throw e;
} finally {
@@ -735,7 +1038,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
dataKeyToDataPutMap.clear();
}
}
-
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 36af59e..8a62c41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -83,6 +83,8 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+ configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
+ Long.toString(scn));
PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler =
new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index dd133c0..3a20220 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
@@ -153,6 +154,9 @@ public class IndexTool extends Configured implements Tool {
public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
+ public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
+ public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
public final static String DATA_TABLE_NAME = "DTName";
public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
public static String INDEX_TABLE_NAME = "ITName";
@@ -167,6 +171,30 @@ public class IndexTool extends Configured implements Tool {
public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
public static String ERROR_MESSAGE = "Error";
public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+ public static String SCAN_STOP_ROW_KEY = "StopRowKey";
+ public final static byte[] SCAN_STOP_ROW_KEY_BYTES = Bytes.toBytes(SCAN_STOP_ROW_KEY);
+ public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
+ public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
+ public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
+ public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
+ public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
+ public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount";
+ public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
+ public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public static String VERIFICATION_PHASE = "Phase";
+ public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
@@ -323,7 +351,7 @@ public class IndexTool extends Configured implements Tool {
formatter.printHelp("help", options);
System.exit(exitCode);
}
-
+
class JobFactory {
Connection connection;
Configuration configuration;
@@ -653,18 +681,25 @@ public class IndexTool extends Configured implements Tool {
return job;
}
- private void createIndexToolOutputTable(Connection connection) throws Exception {
+ private void createIndexToolTables(Connection connection) throws Exception {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
- HBaseAdmin admin = queryServices.getAdmin();
- if (admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
- return;
+ Admin admin = queryServices.getAdmin();
+ if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
+ HTableDescriptor tableDescriptor = new
+ HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+ tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
+ tableDescriptor.addFamily(columnDescriptor);
+ admin.createTable(tableDescriptor);
+ }
+ if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) {
+ HTableDescriptor tableDescriptor = new
+ HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
+ tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
+ tableDescriptor.addFamily(columnDescriptor);
+ admin.createTable(tableDescriptor);
}
- HTableDescriptor tableDescriptor = new
- HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
- tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
- HColumnDescriptor columnDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY);
- tableDescriptor.addFamily(columnDescriptor);
- admin.createTable(tableDescriptor);
}
@Override
@@ -708,7 +743,7 @@ public class IndexTool extends Configured implements Tool {
pIndexTable = null;
connection = ConnectionUtil.getInputConnection(configuration);
- createIndexToolOutputTable(connection);
+ createIndexToolTables(connection);
if (indexTable != null) {
if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index b304dde..4fdc25b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -18,38 +18,93 @@
package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PIndexState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
+
/**
* Reducer class that does only one task and that is to update the index state of the table.
*/
public class PhoenixIndexImportDirectReducer extends
Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
private Configuration configuration;
- private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
+ private AtomicBoolean calledOnce = new AtomicBoolean(false);
- /**
- * Called once at the start of the task.
- */
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- configuration = context.getConfiguration();
+ private void updateCounters(IndexTool.IndexVerifyType verifyType,
+ Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
+ throws IOException {
+ Configuration configuration = context.getConfiguration();
+ try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) {
+ long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+ Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+ IndexRebuildRegionScanner.VerificationResult verificationResult =
+ IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+ context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
+ setValue(verificationResult.getScannedDataRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
+ setValue(verificationResult.getRebuiltIndexRowCount());
+ if (verifyType == IndexTool.IndexVerifyType.ONLY || verifyType == IndexTool.IndexVerifyType.BEFORE ||
+ verifyType == IndexTool.IndexVerifyType.BOTH) {
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildValidIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildExpiredIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildMissingIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildInvalidIndexRowCount());
+ }
+ if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildValidIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildExpiredIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildMissingIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildInvalidIndexRowCount());
+ }
+ if (verificationResult.isVerificationFailed(verifyType)) {
+ throw new IOException("Index verification failed! " + verificationResult);
+ }
+ } catch (Exception e) {
+ throw new IOException("Fail to get index verification result", e);
+ }
}
@Override
protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
- Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context arg2)
- throws IOException, InterruptedException {
+ Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
+ throws IOException, InterruptedException
+
+ {
+ if (!calledOnce.compareAndSet(false, true)) {
+ return;
+ }
+ IndexTool.IndexVerifyType verifyType = getIndexVerifyType(context.getConfiguration());
+ if (verifyType != IndexTool.IndexVerifyType.NONE) {
+ updateCounters(verifyType, context);
+ }
try {
IndexToolUtil.updateIndexState(configuration, PIndexState.ACTIVE);
} catch (SQLException e) {
@@ -57,4 +112,12 @@ public class PhoenixIndexImportDirectReducer extends
throw new RuntimeException(e.getMessage());
}
}
+
+ /**
+ * Called once at the start of the task.
+ */
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ configuration = context.getConfiguration();
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
new file mode 100644
index 0000000..c10694d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -0,0 +1,35 @@
+/**
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+/**
+ * Counters used for Index Tool MR job
+ */
+public enum PhoenixIndexToolJobCounters {
+ SCANNED_DATA_ROW_COUNT,
+ REBUILT_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_VALID_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT,
+ AFTER_REBUILD_VALID_INDEX_ROW_COUNT,
+ AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT,
+ AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 4944c45..92b66cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -917,7 +917,7 @@ public class IndexUtil {
return false;
}
- private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+ public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
boolean addedEmptyColumn = false;
Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
while (iterator.hasNext()) {