You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/03/22 16:57:20 UTC

[2/4] phoenix git commit: PHOENIX-4659 Use coprocessor API to write local transactional indexes

PHOENIX-4659 Use coprocessor API to write local transactional indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b61e72f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b61e72f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b61e72f0

Branch: refs/heads/4.x-HBase-1.3
Commit: b61e72f007e46199ef27bd3308bec4f4bf448c67
Parents: 5814fcb
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 14:18:59 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  7 ++-
 .../end2end/index/txn/TxWriteFailureIT.java     |  6 +--
 .../index/PhoenixTransactionalIndexer.java      | 29 +++++++++++-
 .../phoenix/transaction/TransactionFactory.java | 13 ++++++
 .../apache/phoenix/util/TransactionUtil.java    | 47 ++++++++++++++++++++
 5 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 38bde7f..dfbaf3f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -469,6 +469,7 @@ public class MutableIndexFailureIT extends BaseTest {
         // uncommitted data when the DELETE is executed.
         FailingRegionObserver.FAIL_WRITE = true;
         try {
+            FailingRegionObserver.FAIL_NEXT_WRITE = localIndex && transactional;
             conn.commit();
             if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
                 fail();
@@ -504,13 +505,17 @@ public class MutableIndexFailureIT extends BaseTest {
 
     public static class FailingRegionObserver extends SimpleRegionObserver {
         public static volatile boolean FAIL_WRITE = false;
+        public static volatile boolean FAIL_NEXT_WRITE = false;
         public static final String FAIL_INDEX_NAME = "FAIL_IDX";
         public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
 
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
             boolean throwException = false;
-            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
+            if (FAIL_NEXT_WRITE) {
+                throwException = true;
+                FAIL_NEXT_WRITE = false;
+            } else if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
                     && FAIL_WRITE) {
                 throwException = true;
             } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
index ec60151..049611c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -105,7 +105,9 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
 	
 	@Test
     public void testIndexTableWriteFailure() throws Exception {
-        helpTestWriteFailure(true);
+	    if (!localIndex) { // We cannot fail the index write for local indexes because of the way they're written
+	        helpTestWriteFailure(true);
+	    }
 	}
 	
 	@Test
@@ -175,8 +177,6 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
         assertEquals("k3", rs.getString(1));
         assertEquals("v3", rs.getString(2));
         assertFalse(rs.next());
-        
-        conn.createStatement().execute("DROP TABLE " + dataTableFullName);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 405fc0c..f0b2678 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -23,9 +23,11 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER
 import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -55,6 +58,7 @@ import org.apache.phoenix.trace.util.NullSpan;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 /**
  * Do all the work of managing local index updates for a transactional table from a single coprocessor. Since the transaction
@@ -177,8 +181,29 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                     env.getRegionInfo().getEndKey());
             try (HTableInterface htable = env.getTable(env.getRegionInfo().getTable())) {
                 // get the index updates for all elements in this batch
-                context.indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
+                indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
             }
+            byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+            Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+            List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+            while(indexUpdatesItr.hasNext()) {
+                Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+                if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
+                    // These mutations will not go through the preDelete hooks, so we
+                    // must manually convert them here.
+                    Mutation mutation = TransactionUtil.convertIfDelete(next.getFirst());
+                    localUpdates.add(mutation);
+                    indexUpdatesItr.remove();
+                }
+            }
+            if (!localUpdates.isEmpty()) {
+                miniBatchOp.addOperationsFromCP(0,
+                    localUpdates.toArray(new Mutation[localUpdates.size()]));
+            }
+            if (!indexUpdates.isEmpty()) {
+                context.indexUpdates = indexUpdates;
+            }
+
             current.addTimelineAnnotation("Built index updates, doing preStep");
             TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
         } catch (Throwable t) {
@@ -204,7 +229,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
             if (success) { // if miniBatchOp was successfully written, write index updates
                 if (!context.indexUpdates.isEmpty()) {
-                    this.writer.write(context.indexUpdates, true);
+                    this.writer.write(context.indexUpdates, false);
                 }
                 current.addTimelineAnnotation("Wrote index updates");
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index 8b3fc1d..37050fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -19,8 +19,13 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.tephra.TxConstants;
 
 public class TransactionFactory {
 
@@ -140,4 +145,12 @@ public class TransactionFactory {
         
         return table;
     }
+    
+    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
+        return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    }
+    
+    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
+        return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b61e72f0/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index a99c700..9cd5829 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -17,12 +17,18 @@
  */
 package org.apache.phoenix.util;
 
+import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -89,4 +95,45 @@ public class TransactionUtil {
 		timestamp = convertToMilliseconds(mutationState.getInitialWritePointer());
 		return timestamp;
 	}
+	
+    // Convert HBase Delete into Put so that it can be undone if transaction is rolled back
+	public static Mutation convertIfDelete(Mutation mutation) throws IOException {
+        if (mutation instanceof Delete) {
+            Put deleteMarker = null;
+            for (byte[] family : mutation.getFamilyCellMap().keySet()) {
+                List<Cell> familyCells = mutation.getFamilyCellMap().get(family);
+                if (familyCells.size() == 1) {
+                    if (CellUtil.isDeleteFamily(familyCells.get(0))) {
+                        if (deleteMarker == null) {
+                            deleteMarker = new Put(mutation.getRow());
+                        }
+                        deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteFamilyMarker(
+                                deleteMarker.getRow(), 
+                                family, 
+                                familyCells.get(0).getTimestamp()));
+                    }
+                } else {
+                    for (Cell cell : familyCells) {
+                        if (CellUtil.isDeleteColumns(cell)) {
+                            if (deleteMarker == null) {
+                                deleteMarker = new Put(mutation.getRow());
+                            }
+                            deleteMarker.add(TransactionFactory.getTransactionFactory().newDeleteColumnMarker(
+                                    deleteMarker.getRow(),
+                                    family,
+                                    CellUtil.cloneQualifier(cell), 
+                                    cell.getTimestamp()));
+                        }
+                    }
+                }
+                if (deleteMarker != null) {
+                    for (Map.Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+                        deleteMarker.setAttribute(entry.getKey(), entry.getValue());
+                    }
+                    mutation = deleteMarker;
+                }
+            }
+        }
+        return mutation;
+	}
 }