You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by la...@apache.org on 2017/04/20 00:41:29 UTC

phoenix git commit: Apply Local Indexes batch updates only once.

Repository: phoenix
Updated Branches:
  refs/heads/master ee886bab9 -> 5bd7f79b5


Apply Local Indexes batch updates only once.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5bd7f79b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5bd7f79b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5bd7f79b

Branch: refs/heads/master
Commit: 5bd7f79b51309505a19f854d05cb000f5cd1eb9f
Parents: ee886ba
Author: Lars Hofhansl <la...@apache.org>
Authored: Wed Apr 19 17:41:00 2017 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Wed Apr 19 17:41:00 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 30 ++++++++
 .../org/apache/phoenix/hbase/index/Indexer.java | 73 +++++++-------------
 2 files changed, 56 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5bd7f79b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index a7d0028..8d3316b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -617,6 +617,36 @@ public class LocalIndexIT extends BaseLocalIndexIT {
         }
     }
 
+    @Test
+    public void testLocalGlobalIndexMix() throws Exception {
+        if (isNamespaceMapped) { return; }
+        String tableName = generateUniqueName();
+        Connection conn1 = DriverManager.getConnection(getUrl());
+        String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+                "k1 INTEGER NOT NULL,\n" +
+                "k2 INTEGER NOT NULL,\n" +
+                "k3 INTEGER,\n" +
+                "v1 VARCHAR,\n" +
+                "v2 VARCHAR,\n" +
+                "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
+        conn1.createStatement().execute(ddl);
+        conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + tableName + "(v1)");
+        conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + "(v2)");
+
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z','3')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a','0')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a','2')");
+        conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c','1')");
+        conn1.commit();
+        ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v1 = 'c'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v2 = '2'");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        conn1.close();
+    }
+
     private void copyLocalIndexHFiles(Configuration conf, HRegionInfo fromRegion, HRegionInfo toRegion, boolean move)
             throws IOException {
         Path root = FSUtils.getRootDir(conf);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5bd7f79b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index de98051..9fc76e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -372,7 +372,7 @@ public class Indexer extends BaseRegionObserver {
       super.postPut(e, put, edit, durability);
           return;
         }
-    doPost(edit, put, durability, true, false);
+    doPost(edit, put, durability);
   }
 
   @Override
@@ -382,29 +382,10 @@ public class Indexer extends BaseRegionObserver {
       super.postDelete(e, delete, edit, durability);
           return;
         }
-    doPost(edit, delete, durability, true, false);
+    doPost(edit, delete, durability);
   }
 
   @Override
-  public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-      if (this.disabled) {
-        super.postBatchMutate(c, miniBatchOp);
-        return;
-      }
-      WALEdit edit = miniBatchOp.getWalEdit(0);
-      if (edit != null) {
-        IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
-        if (ikv != null) {
-          // This will prevent the postPut and postDelete hooks from doing anything
-          // We need to do this now, as the postBatchMutateIndispensably (where the
-          // actual index writing gets done) is called after the postPut and postDelete.
-          ikv.markBatchFinished();
-        }
-      }
-  }
-  
-  @Override
   public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
       if (this.disabled) {
@@ -417,17 +398,13 @@ public class Indexer extends BaseRegionObserver {
         //each batch operation, only the first one will have anything useful, so we can just grab that
         Mutation mutation = miniBatchOp.getOperation(0);
         WALEdit edit = miniBatchOp.getWalEdit(0);
-        // We're forcing the index writes here because we've marked the index batch as "finished"
-        // to prevent postPut and postDelete from doing anything, but hold off on writing them
-        // until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky
-        // forceWrite flag, we'd ignore them again here too.
-        doPost(edit, mutation, mutation.getDurability(), false, true);
+        doPost(edit, mutation, mutation.getDurability());
     }
   }
 
-  private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws IOException {
+  private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
     try {
-      doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite);
+      doPostWithExceptions(edit, m, durability);
       return;
     } catch (Throwable e) {
       rethrowIndexingException(e);
@@ -436,7 +413,7 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't complete the index update, but didn't return succesfully either!");
   }
 
-  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite)
+  private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
           throws Exception {
       //short circuit, if we don't need to do any work
       if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
@@ -470,30 +447,32 @@ public class Indexer extends BaseRegionObserver {
            * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
            * lead to writing all the index updates for each Put/Delete).
            */
-          if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) {
+          if (!ikv.getBatchFinished()) {
               Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
 
               // the WAL edit is kept in memory and we already specified the factory when we created the
               // references originally - therefore, we just pass in a null factory here and use the ones
               // already specified on each reference
               try {
-            	  if (!ikv.getBatchFinished() || forceWrite) {
-            		  current.addTimelineAnnotation("Actually doing index update for first time");
-            		  writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates);
-            	  } else if (allowLocalUpdates) {
-            		  Collection<Pair<Mutation, byte[]>> localUpdates =
-            				  new ArrayList<Pair<Mutation, byte[]>>();
-            		  current.addTimelineAnnotation("Actually doing local index update for first time");
-            		  for (Pair<Mutation, byte[]> mutation : indexUpdates) {
-            			  if (Bytes.toString(mutation.getSecond()).equals(
-            					  environment.getRegion().getTableDesc().getNameAsString())) {
-            				  localUpdates.add(mutation);
-            			  }
-            		  }
-                      if(!localUpdates.isEmpty()) {
-                    	  writer.writeAndKillYourselfOnFailure(localUpdates, allowLocalUpdates);
-                      }
-            	  }
+        		  current.addTimelineAnnotation("Actually doing index update for first time");
+                  Collection<Pair<Mutation, byte[]>> localUpdates =
+                          new ArrayList<Pair<Mutation, byte[]>>();
+                  Collection<Pair<Mutation, byte[]>> remoteUpdates =
+                          new ArrayList<Pair<Mutation, byte[]>>();
+        		  for (Pair<Mutation, byte[]> mutation : indexUpdates) {
+        			  if (Bytes.toString(mutation.getSecond()).equals(
+        					  environment.getRegion().getTableDesc().getNameAsString())) {
+        				  localUpdates.add(mutation);
+        			  } else {
+                          remoteUpdates.add(mutation);
+        			  }
+        		  }
+                  if(!remoteUpdates.isEmpty()) {
+                      writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
+                  }
+                  if(!localUpdates.isEmpty()) {
+                      writer.writeAndKillYourselfOnFailure(localUpdates, true);
+                  }
               } finally {                  // With a custom kill policy, we may throw instead of kill the server.
                   // Without doing this in a finally block (at least with the mini cluster),
                   // the region server never goes down.