You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ac...@apache.org on 2020/01/16 06:53:28 UTC

[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5676 Inline-verification from IndexTool does not handle TTL/row-expiry

This is an automated email from the ASF dual-hosted git repository.

achouhan pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
     new 130f5cb  PHOENIX-5676 Inline-verification from IndexTool does not handle TTL/row-expiry
130f5cb is described below

commit 130f5cb3b2d04a1846a80df45d6b1a142ffa3a33
Author: Abhishek Singh Chouhan <ac...@apache.org>
AuthorDate: Wed Jan 15 22:50:20 2020 -0800

    PHOENIX-5676 Inline-verification from IndexTool does not handle TTL/row-expiry
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 71 +++++++++++++++++++++-
 .../coprocessor/IndexRebuildRegionScanner.java     | 62 +++++++++++++++----
 2 files changed, 118 insertions(+), 15 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 5187639..b3b1613 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -59,15 +60,15 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
 import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
-
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
@@ -571,6 +572,72 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
     }
 
     @Test
+    public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            // Insert a row
+            conn.createStatement()
+                    .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            conn.createStatement()
+                    .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
+                        indexTableName, dataTableFullName));
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                IndexTool.IndexVerifyType.ONLY);
+            Cell cell =
+                    getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
+                        indexTableFullName);
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
+                cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0);
+
+            // Run the index tool to populate the index while verifying rows
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                IndexTool.IndexVerifyType.AFTER);
+
+            // Set ttl of index table ridiculously low so that all data is expired
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexTable = TableName.valueOf(indexTableFullName);
+            HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
+            desc.setTimeToLive(1);
+            admin.modifyColumn(indexTable, desc);
+            Thread.sleep(1000);
+            Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
+            int retry = 0;
+            while (retry < 20 && status.getFirst() != 0) {
+                Thread.sleep(2000);
+                status = admin.getAlterStatus(indexTable);
+            }
+            assertTrue(status.getFirst() == 0);
+
+            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+            // Run the index tool using the only-verify option, verify it gives no mismatch
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+                IndexTool.IndexVerifyType.ONLY);
+            Scan scan = new Scan();
+            Table hIndexToolTable =
+                    conn.unwrap(PhoenixConnection.class).getQueryServices()
+                            .getTable(indexToolOutputTable.getName());
+            Result r = hIndexToolTable.getScanner(scan).next();
+            assertTrue(r == null);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
+    @Test
     public void testIndexToolWithTenantId() throws Exception {
         if (!useTenantId) { return;}
         String tenantId = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 7f85812..527d78c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,16 +17,26 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -46,6 +56,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
@@ -74,16 +85,9 @@ import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
-import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
-import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
-import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
@@ -119,6 +123,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
     private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
     private RegionCoprocessorEnvironment env;
     private HTableFactory hTableFactory;
+    private int indexTableTTL;
 
     IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
                                final RegionCoprocessorEnvironment env,
@@ -159,6 +164,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 // Create the following objects only for rebuilds by IndexTool
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
                 outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
                 indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
@@ -413,6 +419,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
                 indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
         int cellCount = 0;
+        long currentTime = EnvironmentEdgeManager.currentTime();
         for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
             if (cells == null) {
                 break;
@@ -422,6 +429,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
                 byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
                 Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
                 if (actualCell == null) {
+                    // Check if cell expired as per the current server's time and data table ttl
+                    // Index table should have the same ttl as the data table, hence we might not
+                    // get a value back from index if it has already expired between our rebuild and
+                    // verify
+                    // TODO: have a metric to update for these cases
+                    if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
+                        continue;
+                    }
                     String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
                             Bytes.toString(qualifier);
                     logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
@@ -487,6 +502,20 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         } catch (Throwable t) {
             ServerUtil.throwIOException(indexHTable.getName().toString(), t);
         }
+        // Check if any expected rows from index(which we didn't get) are already expired due to TTL
+        // TODO: metrics for expired rows
+        if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+            Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+            long currentTime = EnvironmentEdgeManager.currentTime();
+            while(itr.hasNext()) {
+                Entry<byte[], Put> entry = itr.next();
+                long ts = getMaxTimestamp(entry.getValue());
+                if (isTimestampBeforeTTL(currentTime, ts)) {
+                    itr.remove();
+                    rowCount++;
+                }
+            }
+        }
         if (rowCount != expectedRowCount) {
             for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
                 String errorMsg = "Missing index row";
@@ -500,6 +529,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
         }
     }
 
+    private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
+        if (indexTableTTL == HConstants.FOREVER) {
+            return false;
+        }
+        return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
+    }
+
     private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
         byte[] uuidValue = ServerCacheClient.generateId();
         UngroupedAggregateRegionObserver.MutationList currentMutationList =