You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/03/22 16:57:20 UTC
[2/4] phoenix git commit: PHOENIX-4659 Use coprocessor API to write
local transactional indexes
PHOENIX-4659 Use coprocessor API to write local transactional indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b61e72f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b61e72f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b61e72f0
Branch: refs/heads/4.x-HBase-1.3
Commit: b61e72f007e46199ef27bd3308bec4f4bf448c67
Parents: 5814fcb
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 14:18:59 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 7 ++-
.../end2end/index/txn/TxWriteFailureIT.java | 6 +--
.../index/PhoenixTransactionalIndexer.java | 29 +++++++++++-
.../phoenix/transaction/TransactionFactory.java | 13 ++++++
.../apache/phoenix/util/TransactionUtil.java | 47 ++++++++++++++++++++
5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 38bde7f..dfbaf3f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -469,6 +469,7 @@ public class MutableIndexFailureIT extends BaseTest {
// uncommitted data when the DELETE is executed.
FailingRegionObserver.FAIL_WRITE = true;
try {
+ FailingRegionObserver.FAIL_NEXT_WRITE = localIndex && transactional;
conn.commit();
if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
fail();
@@ -504,13 +505,17 @@ public class MutableIndexFailureIT extends BaseTest {
public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
+ public static volatile boolean FAIL_NEXT_WRITE = false;
public static final String FAIL_INDEX_NAME = "FAIL_IDX";
public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
boolean throwException = false;
- if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
+ if (FAIL_NEXT_WRITE) {
+ throwException = true;
+ FAIL_NEXT_WRITE = false;
+ } else if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
&& FAIL_WRITE) {
throwException = true;
} else {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
index ec60151..049611c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -105,7 +105,9 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testIndexTableWriteFailure() throws Exception {
- helpTestWriteFailure(true);
+ if (!localIndex) { // We cannot fail the index write for local indexes because of the way they're written
+ helpTestWriteFailure(true);
+ }
}
@Test
@@ -175,8 +177,6 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
assertEquals("k3", rs.getString(1));
assertEquals("v3", rs.getString(2));
assertFalse(rs.next());
-
- conn.createStatement().execute("DROP TABLE " + dataTableFullName);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 405fc0c..f0b2678 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -23,9 +23,11 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
@@ -55,6 +58,7 @@ import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
/**
* Do all the work of managing local index updates for a transactional table from a single coprocessor. Since the transaction
@@ -177,8 +181,29 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
env.getRegionInfo().getEndKey());
try (HTableInterface htable = env.getTable(env.getRegionInfo().getTable())) {
// get the index updates for all elements in this batch
- context.indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
+ indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
}
+ byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+ Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+ List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+ while(indexUpdatesItr.hasNext()) {
+ Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
+ // These mutations will not go through the preDelete hooks, so we
+ // must manually convert them here.
+ Mutation mutation = TransactionUtil.convertIfDelete(next.getFirst());
+ localUpdates.add(mutation);
+ indexUpdatesItr.remove();
+ }
+ }
+ if (!localUpdates.isEmpty()) {
+ miniBatchOp.addOperationsFromCP(0,
+ localUpdates.toArray(new Mutation[localUpdates.size()]));
+ }
+ if (!indexUpdates.isEmpty()) {
+ context.indexUpdates = indexUpdates;
+ }
+
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
@@ -204,7 +229,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
if (success) { // if miniBatchOp was successfully written, write index updates
if (!context.indexUpdates.isEmpty()) {
- this.writer.write(context.indexUpdates, true);
+ this.writer.write(context.indexUpdates, false);
}
current.addTimelineAnnotation("Wrote index updates");
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 8b3fc1d..37050fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -19,8 +19,13 @@ package org.apache.phoenix.transaction;
import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.tephra.TxConstants;
public class TransactionFactory {
@@ -140,4 +145,12 @@ public class TransactionFactory {
return table;
}
+
+ public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+ return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+ return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index a99c700..9cd5829 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,12 +17,18 @@
*/
package org.apache.phoenix.util;
+import java.io.IOException;
import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -89,4 +95,45 @@ public class TransactionUtil {
timestamp = convertToMilliseconds(mutationState.getInitialWritePointer());
return timestamp;
}
+
+ // Convert HBase Delete into Put so that it can be undone if transaction is rolled back
+ public static Mutation convertIfDelete(Mutation mutation) throws IOException {
+ if (mutation instanceof Delete) {
+ Put deleteMarker = null;
+ for (byte[] family : mutation.getFamilyCellMap().keySet()) {
+ List<Cell> familyCells = mutation.getFamilyCellMap().get(family);
+ if (familyCells.size() == 1) {
+ if (CellUtil.isDeleteFamily(familyCells.get(0))) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteFamilyMarker(
+ deleteMarker.getRow(),
+ family,
+ familyCells.get(0).getTimestamp()));
+ }
+ } else {
+ for (Cell cell : familyCells) {
+ if (CellUtil.isDeleteColumns(cell)) {
+ if (deleteMarker == null) {
+ deleteMarker = new Put(mutation.getRow());
+ }
+ deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteColumnMarker(
+ deleteMarker.getRow(),
+ family,
+ CellUtil.cloneQualifier(cell),
+ cell.getTimestamp()));
+ }
+ }
+ }
+ if (deleteMarker != null) {
+ for (Map.Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+ deleteMarker.setAttribute(entry.getKey(), entry.getValue());
+ }
+ mutation = deleteMarker;
+ }
+ }
+ }
+ return mutation;
+ }
}