You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ac...@apache.org on 2020/01/16 05:52:28 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5676
Inline-verification from IndexTool does not handle TTL/row-expiry
This is an automated email from the ASF dual-hosted git repository.
achouhan pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 90126c4 PHOENIX-5676 Inline-verification from IndexTool does not handle TTL/row-expiry
90126c4 is described below
commit 90126c49b0877dcf6bbdd58e421e8228171f8204
Author: Abhishek Singh Chouhan <ac...@apache.org>
AuthorDate: Wed Jan 15 17:17:41 2020 -0800
PHOENIX-5676 Inline-verification from IndexTool does not handle TTL/row-expiry
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 71 +++++++++++++++++++++-
.../coprocessor/IndexRebuildRegionScanner.java | 62 +++++++++++++++----
2 files changed, 118 insertions(+), 15 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 490ecb5..d79689d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -59,14 +60,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
-
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
@@ -579,6 +580,72 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
+ if (localIndex || transactional || !directApi || useSnapshot) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+ // Insert a row
+ conn.createStatement()
+ .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+ conn.commit();
+ conn.createStatement()
+ .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
+ indexTableName, dataTableFullName));
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.ONLY);
+ Cell cell =
+ getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
+ indexTableFullName);
+ byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0);
+
+ // Run the index tool to populate the index while verifying rows
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.AFTER);
+
+ // Set ttl of index table ridiculously low so that all data is expired
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexTable = TableName.valueOf(indexTableFullName);
+ HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
+ desc.setTimeToLive(1);
+ admin.modifyColumn(indexTable, desc);
+ Thread.sleep(1000);
+ Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
+ int retry = 0;
+ while (retry < 20 && status.getFirst() != 0) {
+ Thread.sleep(2000);
+ status = admin.getAlterStatus(indexTable);
+ }
+ assertTrue(status.getFirst() == 0);
+
+ TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ // Run the index tool using the only-verify option, verify it gives no mismatch
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.ONLY);
+ Scan scan = new Scan();
+ Table hIndexToolTable =
+ conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(indexToolOutputTable.getName());
+ Result r = hIndexToolTable.getScanner(scan).next();
+ assertTrue(r == null);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ }
+ }
+
+ @Test
public void testIndexToolWithTenantId() throws Exception {
if (!useTenantId) { return;}
String tenantId = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 7f85812..527d78c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,16 +17,26 @@
*/
package org.apache.phoenix.coprocessor;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -46,6 +56,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
@@ -74,16 +85,9 @@ import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
-import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
@@ -119,6 +123,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
private RegionCoprocessorEnvironment env;
private HTableFactory hTableFactory;
+ private int indexTableTTL;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
@@ -159,6 +164,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
// Create the following objects only for rebuilds by IndexTool
hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
@@ -413,6 +419,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
int cellCount = 0;
+ long currentTime = EnvironmentEdgeManager.currentTime();
for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
if (cells == null) {
break;
@@ -422,6 +429,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
if (actualCell == null) {
+ // Check if cell expired as per the current server's time and data table ttl
+ // Index table should have the same ttl as the data table, hence we might not
+ // get a value back from index if it has already expired between our rebuild and
+ // verify
+ // TODO: have a metric to update for these cases
+ if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
+ continue;
+ }
String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
@@ -487,6 +502,20 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
}
+ // Check if any expected rows from index(which we didn't get) are already expired due to TTL
+ // TODO: metrics for expired rows
+ if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+ Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ while(itr.hasNext()) {
+ Entry<byte[], Put> entry = itr.next();
+ long ts = getMaxTimestamp(entry.getValue());
+ if (isTimestampBeforeTTL(currentTime, ts)) {
+ itr.remove();
+ rowCount++;
+ }
+ }
+ }
if (rowCount != expectedRowCount) {
for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
String errorMsg = "Missing index row";
@@ -500,6 +529,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
}
+ private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
+ if (indexTableTTL == HConstants.FOREVER) {
+ return false;
+ }
+ return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
+ }
+
private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
byte[] uuidValue = ServerCacheClient.generateId();
UngroupedAggregateRegionObserver.MutationList currentMutationList =