You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2017/05/24 05:51:35 UTC

phoenix git commit: PHOENIX-3827 Make use of HBASE-15600 to write local index mutations along with data mutations atomically(Rajeshbabu)

Repository: phoenix
Updated Branches:
  refs/heads/master dfb2586af -> a2f4d7eeb


PHOENIX-3827 Make use of HBASE-15600 to write local index mutations along with data mutations atomically(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a2f4d7ee
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a2f4d7ee
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a2f4d7ee

Branch: refs/heads/master
Commit: a2f4d7eebec621b58204a9eb78d552f18dcbcf24
Parents: dfb2586
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
Authored: Wed May 24 11:21:12 2017 +0530
Committer: Rajeshbabu Chintaguntla <ra...@apache.org>
Committed: Wed May 24 11:21:12 2017 +0530

----------------------------------------------------------------------
 .../end2end/IndexToolForPartialBuildIT.java     | 41 +++--------
 ...olForPartialBuildWithNamespaceEnabledIT.java | 13 ++--
 .../end2end/index/MutableIndexFailureIT.java    |  6 +-
 .../org/apache/phoenix/hbase/index/Indexer.java | 72 +++++++-------------
 4 files changed, 45 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 59a9106..83bda64 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -64,9 +64,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -74,16 +71,12 @@ import com.google.common.collect.Maps;
 /**
  * Tests for the {@link IndexToolForPartialBuildIT}
  */
-@RunWith(Parameterized.class)
 public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
     
-    private final boolean localIndex;
     protected boolean isNamespaceEnabled = false;
     protected final String tableDDLOptions;
     
-    public IndexToolForPartialBuildIT(boolean localIndex) {
-
-        this.localIndex = localIndex;
+    public IndexToolForPartialBuildIT() {
         StringBuilder optionBuilder = new StringBuilder();
         optionBuilder.append(" SPLIT ON(1,2)");
         this.tableDDLOptions = optionBuilder.toString();
@@ -108,13 +101,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
     }
     
-    @Parameters(name="localIndex = {0}")
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 { false},{ true }
-           });
-    }
-    
     @Test
     public void testSecondaryIndex() throws Exception {
         String schemaName = generateUniqueName();
@@ -142,8 +128,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
             upsertRow(stmt1, 2000);
 
             conn.commit();
-            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
-                    (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+            stmt.execute(String.format("CREATE INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
             FailingRegionObserver.FAIL_WRITE = true;
             upsertRow(stmt1, 3000);
             upsertRow(stmt1, 4000);
@@ -186,7 +171,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
             String actualExplainPlan = QueryUtil.getExplainPlan(rs);
 
             // assert we are pulling from data table.
-			assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, false, isNamespaceEnabled);
+			assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);
 
             rs = stmt1.executeQuery(selectSql);
             for (int i = 1; i <= 7; i++) {
@@ -219,7 +204,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
             // assert we are pulling from index table.
             rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
             actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, localIndex, isNamespaceEnabled);
+            assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, isNamespaceEnabled);
 
             rs = stmt.executeQuery(selectSql);
 
@@ -237,21 +222,13 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
     }
     
 	public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
-			String indxTable, boolean isLocal, boolean isNamespaceMapped) {
+			String indxTable, boolean isNamespaceMapped) {
 
 		String expectedExplainPlan = "";
 		if (indxTable != null) {
-			if (isLocal) {
-				final String localIndexName = SchemaUtil
-						.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), isNamespaceMapped,
-								PTableType.INDEX)
-						.getString();
-				expectedExplainPlan = String.format("CLIENT PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName);
-			} else {
-				expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
-						SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
-								isNamespaceMapped, PTableType.INDEX));
-			}
+		    expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+		        SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
+		            isNamespaceMapped, PTableType.INDEX));
 		} else {
 			expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
 					SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable),
@@ -270,7 +247,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         args.add(dataTable);
         args.add("-pr");
         args.add("-op");
-        args.add("/tmp/output/partialTable_"+localIndex);
+        args.add("/tmp/output/partialTable_");
         return args.toArray(new String[0]);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index a8c1f1e..02c2d93 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
@@ -31,11 +33,12 @@ import com.google.common.collect.Maps;
 /**
  * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
  */
+@RunWith(Parameterized.class)
 public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT {
     
-    
-    public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean localIndex, boolean isNamespaceEnabled) {
-        super(localIndex);
+
+    public IndexToolForPartialBuildWithNamespaceEnabledIT(boolean isNamespaceEnabled) {
+        super();
         this.isNamespaceEnabled=isNamespaceEnabled;
     }
     
@@ -49,10 +52,10 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolFor
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
     
-    @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}")
+    @Parameters(name="isNamespaceEnabled = {0}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, true},{ true, false }
+                 { true },{ false }
            });
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 853647e..07f587d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -234,7 +234,7 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table write failure
-            if (transactional || leaveIndexActiveOnFailure) {
+            if (transactional || leaveIndexActiveOnFailure || localIndex) {
                 assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
             } else {
                 String indexState = rs.getString("INDEX_STATE");
@@ -413,7 +413,7 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.execute();
         try {
             conn.commit();
-            if (commitShouldFail) {
+            if (commitShouldFail && !localIndex) {
                 fail();
             }
         } catch (CommitException e) {
@@ -434,7 +434,7 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.execute();
         try {
             conn.commit();
-            if (commitShouldFail) {
+            if (commitShouldFail && !localIndex) {
                 fail();
             }
         } catch (CommitException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a2f4d7ee/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 15e53a3..04deddb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -96,6 +96,11 @@ import com.google.common.collect.Multimap;
  * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
  * want to have different durability levels, you only need to split the updates into two different
  * batches.
+ * <p>
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because 
+ * Phoenix always does batch mutations.
+ * <p>
  */
 public class Indexer extends BaseRegionObserver {
 
@@ -225,10 +230,8 @@ public class Indexer extends BaseRegionObserver {
           if (!mutations.isEmpty()) {
               Region region = e.getEnvironment().getRegion();
               // Otherwise, submit the mutations directly here
-              region.mutateRowsWithLocks(
-                      mutations,
-                      Collections.<byte[]>emptyList(), // Rows are already locked
-                      HConstants.NO_NONCE, HConstants.NO_NONCE);
+                region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE,
+                    HConstants.NO_NONCE);
           }
           return Result.EMPTY_RESULT;
       } catch (Throwable t) {
@@ -320,14 +323,26 @@ public class Indexer extends BaseRegionObserver {
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-
           // get the index updates for all elements in this batch
           Collection<Pair<Mutation, byte[]>> indexUpdates =
                   this.builder.getIndexUpdate(miniBatchOp, mutations.values());
-
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-
+          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+          Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+          while(indexUpdatesItr.hasNext()) {
+              Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
+                  localUpdates.add(next.getFirst());
+                  indexUpdatesItr.remove();
+              }
+          }
+          if (!localUpdates.isEmpty()) {
+              miniBatchOp.addOperationsFromCP(0,
+                  localUpdates.toArray(new Mutation[localUpdates.size()]));
+          }
+          
           // write them, either to WAL or the index tables
           doPre(indexUpdates, edit, durability);
       }
@@ -366,26 +381,6 @@ public class Indexer extends BaseRegionObserver {
   }
 
   @Override
-  public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
-      final Durability durability) throws IOException {
-      if (this.disabled) {
-      super.postPut(e, put, edit, durability);
-          return;
-        }
-    doPost(edit, put, durability);
-  }
-
-  @Override
-  public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
-      WALEdit edit, final Durability durability) throws IOException {
-      if (this.disabled) {
-      super.postDelete(e, delete, edit, durability);
-          return;
-        }
-    doPost(edit, delete, durability);
-  }
-
-  @Override
   public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
       if (this.disabled) {
@@ -454,25 +449,8 @@ public class Indexer extends BaseRegionObserver {
               // references originally - therefore, we just pass in a null factory here and use the ones
               // already specified on each reference
               try {
-        		  current.addTimelineAnnotation("Actually doing index update for first time");
-                  Collection<Pair<Mutation, byte[]>> localUpdates =
-                          new ArrayList<Pair<Mutation, byte[]>>();
-                  Collection<Pair<Mutation, byte[]>> remoteUpdates =
-                          new ArrayList<Pair<Mutation, byte[]>>();
-        		  for (Pair<Mutation, byte[]> mutation : indexUpdates) {
-        			  if (Bytes.toString(mutation.getSecond()).equals(
-        					  environment.getRegion().getTableDesc().getNameAsString())) {
-        				  localUpdates.add(mutation);
-        			  } else {
-                          remoteUpdates.add(mutation);
-        			  }
-        		  }
-                  if(!remoteUpdates.isEmpty()) {
-                      writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
-                  }
-                  if(!localUpdates.isEmpty()) {
-                      writer.writeAndKillYourselfOnFailure(localUpdates, true);
-                  }
+                  current.addTimelineAnnotation("Actually doing index update for first time");
+                  writer.writeAndKillYourselfOnFailure(indexUpdates, false);
               } finally {                  // With a custom kill policy, we may throw instead of kill the server.
                   // Without doing this in a finally block (at least with the mini cluster),
                   // the region server never goes down.