You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/29 21:28:40 UTC
[phoenix] branch 4.14-HBase-1.4 updated: PHOENIX-5535 Replay delete
markers during server side global index rebuild
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
new 020ec5f PHOENIX-5535 Replay delete markers during server side global index rebuild
020ec5f is described below
commit 020ec5f413e64a59b5285387f4d764ab6805687b
Author: Kadir <ko...@salesforce.com>
AuthorDate: Mon Oct 28 13:25:37 2019 -0700
PHOENIX-5535 Replay delete markers during server side global index rebuild
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 69 ++++++++++++++++++++++
.../UngroupedAggregateRegionObserver.java | 62 +++++++++++++++++--
.../apache/phoenix/index/GlobalIndexChecker.java | 3 -
3 files changed, 126 insertions(+), 8 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 6fc01bd..a151395 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -37,8 +38,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
@@ -49,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -151,6 +156,70 @@ public class IndexToolIT extends ParallelStatsEnabledIT {
return list;
}
+ private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
+ for (int i = 1; i <= nrows; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i + 1);
+ if (i % nthRowNull != 0) {
+ stmt.setInt(3, i * i);
+ } else {
+ stmt.setNull(3, Types.INTEGER);
+ }
+ stmt.execute();
+ }
+ }
+
+ @Test
+ public void testWithSetNull() throws Exception {
+ // This test is for building non-transactional mutable global indexes with direct api
+ if (localIndex || transactional || !mutable || !directApi || useSnapshot) {
+ return;
+ }
+ // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
+ // and after that the index table is still rebuilt correctly
+ final int NROWS = 2 * 3 * 5 * 7;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ + tableDDLOptions);
+ String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+ setEveryNthRowWithNull(NROWS, 2, stmt);
+ conn.commit();
+ setEveryNthRowWithNull(NROWS, 3, stmt);
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ",
+ (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName));
+ // Run the index MR job and verify that the index table is built correctly
+ IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ // Check after compaction
+ TestUtil.doMajorCompaction(conn, dataTableFullName);
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ setEveryNthRowWithNull(NROWS, 5, stmt);
+ conn.commit();
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ setEveryNthRowWithNull(NROWS, 7, stmt);
+ conn.commit();
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ TestUtil.doMajorCompaction(conn, dataTableFullName);
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ }
+ }
+
@Test
public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3cae671..e31c5dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -36,7 +36,10 @@ import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -1050,6 +1053,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private Scan scan;
private RegionScanner innerScanner;
final Region region;
+ IndexMaintainer indexMaintainer;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final Configuration config) {
@@ -1070,10 +1074,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
+ if (!scan.isRaw()) {
+ List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ indexMaintainer = maintainers.get(0);
+ }
this.scan = scan;
this.innerScanner = innerScanner;
this.region = region;
}
+
@Override
public HRegionInfo getRegionInfo() {
return region.getRegionInfo();
@@ -1095,6 +1104,46 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
m.setDurability(Durability.SKIP_WAL);
}
+ private Delete generateDeleteMarkers(List<Cell> row) {
+ Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+ if (row.size() == allColumns.size() + 1) {
+ // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+ return null;
+ }
+ Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+ long ts = 0;
+ for (Cell cell : row) {
+ includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
+ }
+ byte[] rowKey;
+ Delete del = null;
+ for (ColumnReference column : allColumns) {
+ if (!includedColumns.contains(column)) {
+ if (del == null) {
+ Cell cell = row.get(0);
+ rowKey = new byte[cell.getRowLength()];
+ System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ del = new Delete(rowKey);
+ }
+ del.addColumns(column.getFamily(), column.getQualifier(), ts);
+ }
+ }
+ return del;
+ }
+
+ private byte[] commitIfReady(byte[] uuidValue) throws IOException {
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ checkForRegionClosingOrSplitting();
+ commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ uuidValue = ServerCacheClient.generateId();
+ mutations.clear();
+ }
+ return uuidValue;
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
int rowCount = 0;
@@ -1124,11 +1173,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
del.addDeleteMarker(cell);
}
}
- if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- checkForRegionClosingOrSplitting();
- commitBatchWithRetries(region, mutations, blockingMemstoreSize);
- uuidValue = ServerCacheClient.generateId();
- mutations.clear();
+ uuidValue = commitIfReady(uuidValue);
+ if (!scan.isRaw()) {
+ Delete deleteMarkers = generateDeleteMarkers(row);
+ if (deleteMarkers != null) {
+ setMutationAttributes(deleteMarkers, uuidValue);
+ mutations.add(deleteMarkers);
+ uuidValue = commitIfReady(uuidValue);
+ }
}
rowCount++;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 7848047..41973f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -302,9 +302,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
buildIndexScan.setStartRow(dataRowKey);
buildIndexScan.setStopRow(dataRowKey);
buildIndexScan.setTimeRange(0, maxTimestamp);
- // If the data table row has been deleted then we want to delete the corresponding index row too.
- // Thus, we are using a raw scan
- buildIndexScan.setRaw(true);
try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
resultScanner.next();
} catch (Throwable t) {