You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/29 21:27:15 UTC

[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5535 Replay delete markers during server side global index rebuild

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
     new d6a0208  PHOENIX-5535 Replay delete markers during server side global index rebuild
d6a0208 is described below

commit d6a0208ef1290644f9f29a8d656fdc8b2b48d045
Author: Kadir <ko...@salesforce.com>
AuthorDate: Mon Oct 28 13:25:37 2019 -0700

    PHOENIX-5535 Replay delete markers during server side global index rebuild
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 69 ++++++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java          | 62 +++++++++++++++++--
 .../apache/phoenix/index/GlobalIndexChecker.java   |  3 -
 3 files changed, 126 insertions(+), 8 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 6fc01bd..a151395 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -37,8 +38,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -49,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -151,6 +156,70 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
         return list;
     }
 
+    private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
+        for (int i = 1; i <= nrows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i + 1);
+            if (i % nthRowNull != 0) {
+                stmt.setInt(3, i * i);
+            } else {
+                stmt.setNull(3, Types.INTEGER);
+            }
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testWithSetNull() throws Exception {
+        // This test is for building non-transactional mutable global indexes with direct api
+        if (localIndex || transactional || !mutable || !directApi || useSnapshot) {
+            return;
+        }
+        // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
+        // and after that the index table is still rebuilt correctly
+        final int NROWS = 2 * 3 * 5 * 7;
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+                    + tableDDLOptions);
+            String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+            setEveryNthRowWithNull(NROWS, 2, stmt);
+            conn.commit();
+            setEveryNthRowWithNull(NROWS, 3, stmt);
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ",
+                    (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName));
+            // Run the index MR job and verify that the index table is built correctly
+            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            // Check after compaction
+            TestUtil.doMajorCompaction(conn, dataTableFullName);
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            setEveryNthRowWithNull(NROWS, 5, stmt);
+            conn.commit();
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            setEveryNthRowWithNull(NROWS, 7, stmt);
+            conn.commit();
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+            TestUtil.doMajorCompaction(conn, dataTableFullName);
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+        }
+    }
+
     @Test
     public void testSecondaryIndex() throws Exception {
         String schemaName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3cae671..e31c5dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -36,7 +36,10 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -1050,6 +1053,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         private Scan scan;
         private RegionScanner innerScanner;
         final Region region;
+        IndexMaintainer indexMaintainer;
 
         IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
                                    final Configuration config) {
@@ -1070,10 +1074,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 useProto = false;
                 indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
             }
+            if (!scan.isRaw()) {
+                List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+                indexMaintainer = maintainers.get(0);
+            }
             this.scan = scan;
             this.innerScanner = innerScanner;
             this.region = region;
         }
+
         @Override
         public HRegionInfo getRegionInfo() {
             return region.getRegionInfo();
@@ -1095,6 +1104,46 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             m.setDurability(Durability.SKIP_WAL);
         }
 
+        private Delete generateDeleteMarkers(List<Cell> row) {
+            Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+            if (row.size() == allColumns.size() + 1) {
+                // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+                return null;
+            }
+            Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+            long ts = 0;
+            for (Cell cell : row) {
+                includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+                if (ts < cell.getTimestamp()) {
+                    ts = cell.getTimestamp();
+                }
+            }
+            byte[] rowKey;
+            Delete del = null;
+            for (ColumnReference column : allColumns) {
+                if (!includedColumns.contains(column)) {
+                    if (del == null) {
+                        Cell cell = row.get(0);
+                        rowKey = new byte[cell.getRowLength()];
+                        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+                        del = new Delete(rowKey);
+                    }
+                    del.addColumns(column.getFamily(), column.getQualifier(), ts);
+                }
+            }
+            return del;
+        }
+
+        private byte[] commitIfReady(byte[] uuidValue) throws IOException {
+            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                checkForRegionClosingOrSplitting();
+                commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                uuidValue = ServerCacheClient.generateId();
+                mutations.clear();
+            }
+            return uuidValue;
+        }
+
         @Override
         public boolean next(List<Cell> results) throws IOException {
             int rowCount = 0;
@@ -1124,11 +1173,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     del.addDeleteMarker(cell);
                                 }
                             }
-                            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                                checkForRegionClosingOrSplitting();
-                                commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                                uuidValue = ServerCacheClient.generateId();
-                                mutations.clear();
+                            uuidValue = commitIfReady(uuidValue);
+                            if (!scan.isRaw()) {
+                                Delete deleteMarkers = generateDeleteMarkers(row);
+                                if (deleteMarkers != null) {
+                                    setMutationAttributes(deleteMarkers, uuidValue);
+                                    mutations.add(deleteMarkers);
+                                    uuidValue = commitIfReady(uuidValue);
+                                }
                             }
                             rowCount++;
                         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index c78bce7..ecc720f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -302,9 +302,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             buildIndexScan.setStartRow(dataRowKey);
             buildIndexScan.setStopRow(dataRowKey);
             buildIndexScan.setTimeRange(0, maxTimestamp);
-            // If the data table row has been deleted then we want to delete the corresponding index row too.
-            // Thus, we are using a raw scan
-            buildIndexScan.setRaw(true);
             try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
                 resultScanner.next();
             } catch (Throwable t) {