You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/25 20:38:40 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5539 Full row index
write at the last write phase for mutable global indexes
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new e73eb14 PHOENIX-5539 Full row index write at the last write phase for mutable global indexes
e73eb14 is described below
commit e73eb147ab24d078e1baf5c045428838f7175855
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Oct 24 01:10:32 2019 -0700
PHOENIX-5539 Full row index write at the last write phase for mutable global indexes
---
.../end2end/index/GlobalIndexCheckerIT.java | 50 ++++++++++++++++
.../phoenix/hbase/index/IndexRegionObserver.java | 68 ++++++++++++----------
2 files changed, 87 insertions(+), 31 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 63c6e75..7c823ea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
+import static org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -33,11 +34,19 @@ import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -257,6 +266,47 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ public static void checkUnverifiedCellCount(Connection conn, String indexTableName) throws Exception {
+ Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(Bytes.toBytes(indexTableName));
+ long indexCnt = TestUtil.getRowCount(hIndexTable, false);
+ assertEquals(1, indexCnt);
+ assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, IndexRegionObserver.UNVERIFIED_BYTES));
+ Scan s = new Scan();
+ int cntCellValues = 0;
+ try (ResultScanner scanner = hIndexTable.getScanner(s)) {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ CellScanner cellScanner = result.cellScanner();
+ while (cellScanner.advance()) {
+ cntCellValues++;
+ }
+ }
+ }
+ assertEquals(1, cntCellValues);
+ }
+ @Test
+ public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception {
+ String dataTableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("create table " + dataTableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)");
+ // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase)
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ // check that in the first phase we don't send the full row.
+ // We count the num of cells for this
+ checkUnverifiedCellCount(conn, indexTableName);
+ // Add rows and check everything is still okay
+ verifyTableHealth(conn, dataTableName, indexTableName);
+ }
+ }
+
@Test
public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 993ff4b..a41e729 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -500,9 +500,25 @@ public class IndexRegionObserver extends BaseRegionObserver {
return mutations;
}
+ public void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+ List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+ if (cellList == null) {
+ return;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ emptyCQ, 0, emptyCQ.length) == 0) {
+ cellIterator.remove();
+ return;
+ }
+ }
+ }
+
private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
- Collection<? extends Mutation> mutations) throws Throwable {
+ Collection<? extends Mutation> mutations, long now) throws Throwable {
IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
throw new DoNotRetryIOException(
@@ -511,7 +527,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
- List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
@@ -528,7 +543,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
- indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+ context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
while(indexUpdatesItr.hasNext()) {
Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
@@ -549,35 +564,30 @@ public class IndexRegionObserver extends BaseRegionObserver {
// add the VERIFIED cell, which is the empty cell
Mutation m = next.getFirst().getFirst();
boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
- long ts = getMaxTimestamp(m);
if (rebuild) {
if (m instanceof Put) {
+ long ts = getMaxTimestamp(m);
+ // Remove the empty column prepared by Index codec as we need to change its value
+ removeEmptyColumn(m, emptyCF, emptyCQ);
((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
}
} else {
- if (m instanceof Put) {
- ((Put)m).addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
- // Ignore post index updates (i.e., the third write phase updates) for this row if it is
- // going through concurrent updates
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
- if (!context.pendingRows.contains(rowKey)) {
- Put put = new Put(m.getRow());
- put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
- context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), next.getSecond()));
- }
- } else {
- // For a delete mutation, first unverify the existing row in the index table and then delete
- // the row from the index table after deleting the corresponding row from the data table
- indexUpdatesItr.remove();
- Put put = new Put(m.getRow());
- put.addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
- indexUpdatesForDeletes.add(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()));
- // Ignore post index updates (i.e., the third write phase updates) for this row if it is
- // going through concurrent updates
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
- if (!context.pendingRows.contains(rowKey)) {
- context.intermediatePostIndexUpdates.add(next);
+ indexUpdatesItr.remove();
+ // For this mutation whether it is put or delete, set the status of the index row "unverified"
+ // This will be done before the data table row is updated (i.e., in the first write phase)
+ Put unverifiedPut = new Put(m.getRow());
+ unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+ context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+ // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+ // going through concurrent updates
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+ if (!context.pendingRows.contains(rowKey)) {
+ if (m instanceof Put) {
+ // Remove the empty column prepared by Index codec as we need to change its value
+ removeEmptyColumn(m, emptyCF, emptyCQ);
+ ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
}
+ context.intermediatePostIndexUpdates.add(next);
}
}
}
@@ -586,10 +596,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
miniBatchOp.addOperationsFromCP(0,
localUpdates.toArray(new Mutation[localUpdates.size()]));
}
- if (!indexUpdatesForDeletes.isEmpty()) {
- context.preIndexUpdates = indexUpdatesForDeletes;
- }
-
if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
}
@@ -628,7 +634,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
long start = EnvironmentEdgeManager.currentTimeMillis();
- prepareIndexMutations(c, miniBatchOp, context, mutations);
+ prepareIndexMutations(c, miniBatchOp, context, mutations, now);
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
// Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to