You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/22 04:36:57 UTC

[phoenix] branch 4.14-HBase-1.4 updated: Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
     new fcbbf11  Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"
fcbbf11 is described below

commit fcbbf1191857a7a03d8710f3ff5e66584a97972a
Author: Kadir <ko...@salesforce.com>
AuthorDate: Mon Oct 21 21:36:54 2019 -0700

    Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"
    
    This reverts commit c3e17346e1314f3be87e4cb3693f236288a74c47.
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 67 ----------------------
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 29 ++++------
 .../UngroupedAggregateRegionObserver.java          | 19 +-----
 .../apache/phoenix/index/GlobalIndexChecker.java   |  9 ++-
 4 files changed, 16 insertions(+), 108 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 9c1049d..9cc2393 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,7 +29,6 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -246,72 +245,6 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
         }
     }
 
-    private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
-        for (int i = 0; i < nrows; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i * 10);
-            if (i % nthRowNull != 0) {
-                stmt.setInt(3, 9000 + i * nthRowNull);
-            } else {
-                stmt.setNull(3, Types.INTEGER);
-            }
-            stmt.execute();
-        }
-    }
-
-    @Test
-    public void testWithSetNull() throws Exception {
-        // This test is for building non-transactional global indexes with direct api
-        if (localIndex || transactional) {
-            return;
-        }
-        // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
-        // and after that the index table is still rebuilt correctly
-        final int NROWS = 2 * 3 * 5 * 7;
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String stmString1 =
-                    "CREATE TABLE " + dataTableFullName
-                            + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER, ZIP INTEGER) "
-                            + tableDDLOptions;
-            conn.createStatement().execute(stmString1);
-            String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
-            setEveryNthRowWithNull(NROWS, 2, stmt);
-            conn.commit();
-            setEveryNthRowWithNull(NROWS, 3, stmt);
-            conn.commit();
-            String stmtString2 =
-                    String.format(
-                            "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP) ASYNC ",
-                            (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
-            conn.createStatement().execute(stmtString2);
-            // Run the index MR job and verify that the index table is built correctly
-            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-
-            // Repeat the test with compaction
-            setEveryNthRowWithNull(NROWS, 5, stmt);
-            conn.commit();
-            setEveryNthRowWithNull(NROWS, 7, stmt);
-            conn.commit();
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
-            // Run the index MR job and verify that the index table is built correctly
-            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-        }
-    }
-
-
     @Test
     public void testBuildSecondaryIndexAndScrutinize() throws Exception {
         // This test is for building non-transactional global indexes with direct api
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 8035982..7d1c1b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -89,12 +89,7 @@ public class ServerBuildIndexCompiler {
 
     public MutationPlan compile(PTable index) throws SQLException {
         try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
-            String query;
-            if (index.getIndexType() == PTable.IndexType.LOCAL) {
-                query = "SELECT count(*) FROM " + tableName;
-            } else {
-                query = "SELECT * FROM " + tableName;
-            }
+            String query = "SELECT count(*) FROM " + tableName;
             this.plan = statement.compileQuery(query);
             TableRef tableRef = plan.getTableRef();
             Scan scan = plan.getContext().getScan();
@@ -115,28 +110,26 @@ public class ServerBuildIndexCompiler {
             // the index table possibly remotely
             if (index.getIndexType() == PTable.IndexType.LOCAL) {
                 scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
-                // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
-                // However, in this case, we need to project all of the data columns that contribute to the index.
-                IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
-                for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                    if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                        scan.addFamily(columnRef.getFamily());
-                    } else {
-                        scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
-                    }
-                }
             } else {
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
                 scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
-                scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
+            }
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns that contribute to the index.
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    scan.addFamily(columnRef.getFamily());
+                } else {
+                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+                }
             }
 
             if (dataTable.isTransactional()) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
             }
 
-
             // Go through MutationPlan abstraction so that we can create local indexes
             // with a connectionless connection (which makes testing easier).
             return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d6a9506..0166206 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1047,20 +1047,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
-        RegionScanner newScanner;
-        if (!scan.isRaw()) {
-            // We need to use raw scan here to replay delete markers too (PHOENIX-5535)
-            scan.getFamilyMap().clear();
-            scan.setRaw(true);
-            scan.setCacheBlocks(false);
-            scan.setMaxVersions();
-            newScanner = region.getScanner(scan);
-        }
-        else {
-            // The scan is already raw, so do not do anything
-            newScanner = innerScanner;
-        }
-
         try {
             int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
             long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
@@ -1072,7 +1058,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             synchronized (innerScanner) {
                 do {
                     List<Cell> results = new ArrayList<Cell>();
-                    hasMore =  newScanner.nextRaw(results);
+                    hasMore = innerScanner.nextRaw(results);
                     if (!results.isEmpty()) {
                         Put put = null;
                         Delete del = null;
@@ -1125,9 +1111,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
             throw e;
         } finally {
-            if (newScanner != innerScanner) {
-                newScanner.close();
-            }
             region.closeRegionOperation();
         }
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index f2b35a0..21c2e69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,17 +299,16 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
-                // We want delete markers to be replayed during index rebuild.
-                buildIndexScan.setRaw(true);
-                buildIndexScan.setCacheBlocks(false);
-                buildIndexScan.setMaxVersions();
-                buildIndexScan.setTimeRange(0, maxTimestamp);
             }
             // Rebuild the index row from the corresponding the row in the the data table
             // Get the data row key from the index row key
             byte[] dataRowKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
             buildIndexScan.setStartRow(dataRowKey);
             buildIndexScan.setStopRow(dataRowKey);
+            buildIndexScan.setTimeRange(0, maxTimestamp);
+            // If the data table row has been deleted then we want to delete the corresponding index row too.
+            // Thus, we are using a raw scan
+            buildIndexScan.setRaw(true);
             try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
                 resultScanner.next();
             } catch (Throwable t) {