You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/02/06 22:03:09 UTC

[lucene-solr] branch branch_8x updated (9bafc49 -> 69b5a04)

This is an automated email from the ASF dual-hosted git repository.

hossman pushed a change to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from 9bafc49  Removed some unused variables from DistributedUpdateProcessor
     new a383362  Harden OrderedExecutorTest to use concurrent latches/barriers for testing parallelism instead of making assumpions about how milliseconds something should take in another thread
     new 69b5a04  SOLR-13210: Fix TriLevelCompositeIdRoutingTest to actually make sense

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../solr/cloud/TriLevelCompositeIdRoutingTest.java | 159 ++++++++++-----------
 .../org/apache/solr/util/OrderedExecutorTest.java  | 159 +++++++++++++++++----
 2 files changed, 207 insertions(+), 111 deletions(-)


[lucene-solr] 01/02: Harden OrderedExecutorTest to use concurrent latches/barriers for testing parallelism instead of making assumpions about how milliseconds something should take in another thread

Posted by ho...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit a38336285d684feb1cfede2732f493565b5a9d78
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Wed Feb 6 14:32:12 2019 -0700

    Harden OrderedExecutorTest to use concurrent latches/barriers for testing parallelism instead of making assumpions about how milliseconds something should take in another thread
    
    (cherry picked from commit ea2956fda3c23695daa43c6cb6f1c7b2a345ce27)
---
 .../org/apache/solr/util/OrderedExecutorTest.java  | 159 +++++++++++++++++----
 1 file changed, 133 insertions(+), 26 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 0211a11..929cc72 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -17,15 +17,29 @@
 
 package org.apache.solr.util;
 
+import java.lang.invoke.MethodHandles;
+
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class OrderedExecutorTest extends LuceneTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Test
   public void testExecutionInOrder() {
@@ -40,44 +54,137 @@ public class OrderedExecutorTest extends LuceneTestCase {
 
   @Test
   public void testLockWhenQueueIsFull() {
-    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
-    IntBox intBox = new IntBox();
-    long t = System.nanoTime();
-    orderedExecutor.execute(1, () -> {
+    final ExecutorService controlExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_control");
+    final OrderedExecutor orderedExecutor = new OrderedExecutor
+      (10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_test"));
+    
+    try {
+      // AAA and BBB events will both depend on the use of the same lockId
+      final BlockingQueue<String> events = new ArrayBlockingQueue<>(2);
+      final Integer lockId = 1;
+      
+      // AAA enters executor first so it should execute first (even though it's waiting on latch)
+      final CountDownLatch latchAAA = new CountDownLatch(1);
+      orderedExecutor.execute(lockId, () -> {
+          try {
+            if (latchAAA.await(120, TimeUnit.SECONDS)) {
+              events.add("AAA");
+            } else {
+              events.add("AAA Timed Out");
+            }
+          } catch (InterruptedException e) {
+            log.error("Interrupt in AAA worker", e);
+            Thread.currentThread().interrupt();
+          }
+        });
+      // BBB doesn't care about the latch, but because it uses the same lockId, it's blocked on AAA
+      // so we execute it in a background thread...
+      controlExecutor.execute(() -> {
+          orderedExecutor.execute(lockId, () -> {
+              events.add("BBB");
+            });
+        });
+      
+      // now if we release the latchAAA, AAA should be garunteed to fire first, then BBB
+      latchAAA.countDown();
       try {
-        Thread.sleep(500L);
+        assertEquals("AAA", events.poll(120, TimeUnit.SECONDS));
+        assertEquals("BBB", events.poll(120, TimeUnit.SECONDS));
       } catch (InterruptedException e) {
+        log.error("Interrupt polling event queue", e);
         Thread.currentThread().interrupt();
+        fail("interupt while trying to poll event queue");
       }
-      intBox.value++;
-    });
-    assertTrue(System.nanoTime() - t < 100 * 1000000);
-
-    t = System.nanoTime();
-    orderedExecutor.execute(1, () -> {
-      intBox.value++;
-    });
-    assertTrue(System.nanoTime() - t > 300 * 1000000);
-    orderedExecutor.shutdownAndAwaitTermination();
-    assertEquals(intBox.value, 2);
+    } finally {
+      ExecutorUtil.shutdownAndAwaitTermination(controlExecutor);
+      orderedExecutor.shutdownAndAwaitTermination();
+    }
   }
 
   @Test
   public void testRunInParallel() {
-    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
-    AtomicInteger atomicInteger = new AtomicInteger(0);
-    orderedExecutor.execute(1, () -> {
+    final int parallelism = atLeast(3);
+    
+    final ExecutorService controlExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_control");
+    final OrderedExecutor orderedExecutor = new OrderedExecutor
+      (parallelism, ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_test"));
+
+    try {
+      // distinct lockIds should be able to be used in parallel, up to the size of the executor,
+      // w/o any execute calls blocking... until the test Runables being executed are all
+      // waiting on the same cyclic barrier...
+      final CyclicBarrier barrier = new CyclicBarrier(parallelism + 1);
+      final CountDownLatch preBarrierLatch = new CountDownLatch(parallelism);
+      final CountDownLatch postBarrierLatch = new CountDownLatch(parallelism);
+      
+      for (int i = 0; i < parallelism; i++) {
+        final int lockId = i;
+        controlExecutor.execute(() -> {
+            orderedExecutor.execute(lockId, () -> {
+                try {
+                  log.info("Worker #{} starting", lockId);
+                  preBarrierLatch.countDown();
+                  barrier.await(120, TimeUnit.SECONDS);
+                  postBarrierLatch.countDown();
+                } catch (TimeoutException t) {
+                  log.error("Timeout in worker#" + lockId + "awaiting barrier", t);
+                } catch (BrokenBarrierException b) {
+                  log.error("Broken Barrier in worker#" + lockId, b);
+                } catch (InterruptedException e) {
+                  log.error("Interrupt in worker#" + lockId + "awaiting barrier", e);
+                  Thread.currentThread().interrupt();
+                }
+              });
+          });
+      }
+
+      log.info("main thread: about to wait on pre-barrier latch, barrier={}, post-barrier latch={}",
+               barrier.getNumberWaiting(), postBarrierLatch.getCount());
+      
       try {
-        Thread.sleep(500L);
+        // this latch should have fully counted down by now
+        // (or with a small await for thread scheduling but no other external action)
+        assertTrue("Timeout awaiting pre barrier latch",
+                   preBarrierLatch.await(120, TimeUnit.SECONDS));
       } catch (InterruptedException e) {
+        log.error("Interrupt awwaiting pre barrier latch", e);
         Thread.currentThread().interrupt();
+        fail("interupt while trying to await the preBarrierLatch");
       }
-      if (atomicInteger.get() == 1) atomicInteger.incrementAndGet();
-    });
+      
+      log.info("main thread: pre-barrier latch done, barrier={}, post-barrier latch={}",
+               barrier.getNumberWaiting(), postBarrierLatch.getCount());
+      
+      // nothing should have counted down yet on the postBarrierLatch
+      assertEquals(parallelism, postBarrierLatch.getCount());
 
-    orderedExecutor.execute(2, atomicInteger::incrementAndGet);
-    orderedExecutor.shutdownAndAwaitTermination();
-    assertEquals(atomicInteger.get(), 2);
+      try {
+        // if we now await on the the barrier, it should release
+        // (once all other threads get to the barrier as well, but no external action needed)
+        barrier.await(120, TimeUnit.SECONDS);
+        
+        log.info("main thread: barrier has released, post-barrier latch={}",
+                 postBarrierLatch.getCount());
+        
+        // and now the post-barrier latch should release immediately
+        // (or with a small await for thread scheduling but no other external action)
+        assertTrue("Timeout awaiting post barrier latch",
+                   postBarrierLatch.await(120, TimeUnit.SECONDS));
+      } catch (TimeoutException t) {
+        log.error("Timeout awaiting barrier", t);
+        fail("barrier timed out");
+      } catch (BrokenBarrierException b) {
+        log.error("Broken Barrier in main test thread", b);
+        fail("broken barrier while trying to release the barrier");
+      } catch (InterruptedException e) {
+        log.error("Interrupt awwaiting barrier / post barrier latch", e);
+        Thread.currentThread().interrupt();
+        fail("interupt while trying to release the barrier and await the postBarrierLatch");
+      }
+    } finally {
+      ExecutorUtil.shutdownAndAwaitTermination(controlExecutor);
+      orderedExecutor.shutdownAndAwaitTermination();
+    }
   }
 
   @Test


[lucene-solr] 02/02: SOLR-13210: Fix TriLevelCompositeIdRoutingTest to actually make sense

Posted by ho...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 69b5a04a4dfc9684554b5fde82f1107d006847a1
Author: Chris Hostetter <ho...@apache.org>
AuthorDate: Wed Feb 6 14:42:30 2019 -0700

    SOLR-13210: Fix TriLevelCompositeIdRoutingTest to actually make sense
    
    (cherry picked from commit 87ad59f826d3ea5ea0e6239397c1d9a8acf59323)
---
 .../solr/cloud/TriLevelCompositeIdRoutingTest.java | 159 ++++++++++-----------
 1 file changed, 74 insertions(+), 85 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/TriLevelCompositeIdRoutingTest.java b/solr/core/src/test/org/apache/solr/cloud/TriLevelCompositeIdRoutingTest.java
index 05a0ab9..a18f88c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TriLevelCompositeIdRoutingTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TriLevelCompositeIdRoutingTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.solr.cloud;
 
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.lucene.util.TestUtil;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
@@ -24,18 +31,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
 
 public class TriLevelCompositeIdRoutingTest extends ShardRoutingTest {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  final int NUM_APPS;
-  final int NUM_USERS;
-  final int NUM_DOCS;
+  final int MAX_APP_ID;
+  final int MAX_USER_ID;
+  final int MAX_DOC_ID;
+  final int NUM_ADDS;
 
 
   @BeforeClass
@@ -51,14 +55,14 @@ public class TriLevelCompositeIdRoutingTest extends ShardRoutingTest {
   public TriLevelCompositeIdRoutingTest() {
     schemaString = "schema15.xml";      // we need a string id
     
-    
     sliceCount = TestUtil.nextInt(random(), 1, (TEST_NIGHTLY ? 5 : 3)); // this is the number of *SHARDS*
     int replicationFactor = rarely() ? 2 : 1; // replication is not the focus of this test
     fixShardCount(replicationFactor * sliceCount); // total num cores, one per node
-    
-    NUM_APPS = atLeast(5);
-    NUM_USERS = atLeast(10);
-    NUM_DOCS = atLeast(100);
+
+    MAX_APP_ID = atLeast(5);
+    MAX_USER_ID = atLeast(10);
+    MAX_DOC_ID = atLeast(20);
+    NUM_ADDS = atLeast(200);
   }
 
   @Test
@@ -71,11 +75,57 @@ public class TriLevelCompositeIdRoutingTest extends ShardRoutingTest {
       // todo: do I have to do this here?
       waitForRecoveriesToFinish(true);
 
-      doTriLevelHashingTest();
-      del("*:*");
+      // NOTE: we might randomly generate the same uniqueKey value multiple times,
+      // (which is a valid test case, they should route to the same shard both times)
+      // so we track each expectedId in a set for later sanity checking
+      final Set<String> expectedUniqueKeys = new HashSet<>();
+      for (int i = 0; i < NUM_ADDS; i++) {
+        final int appId = r.nextInt(MAX_APP_ID) + 1;
+        final int userId = r.nextInt(MAX_USER_ID) + 1;
+        // skew the odds so half the time we have no mask, and half the time we
+        // have an even distribution of 1-16 bits
+        final int bitMask = Math.max(0, r.nextInt(32)-15);
+        
+        String id = "app" + appId + (bitMask <= 0 ? "" : ("/" + bitMask))
+          + "!" + "user" + userId
+          + "!" + "doc" + r.nextInt(MAX_DOC_ID);
+        
+        doAddDoc(id);
+        expectedUniqueKeys.add(id);
+      }
+      
       commit();
-      doTriLevelHashingTestWithBitMask();
+      
+      final Map<String, String> routePrefixMap = new HashMap<>();
+      final Set<String> actualUniqueKeys = new HashSet<>();
+      for (int i = 1; i <= sliceCount; i++) {
+        final String shardId = "shard" + i;
+        final Set<String> uniqueKeysInShard = fetchUniqueKeysFromShard(shardId);
+        
+        { // sanity check our uniqueKey values aren't duplicated across shards
+          final Set<String> uniqueKeysOnDuplicateShards = new HashSet<>(uniqueKeysInShard);
+          uniqueKeysOnDuplicateShards.retainAll(actualUniqueKeys);
+          assertEquals(shardId + " contains some uniqueKeys that were already found on a previous shard",
+                       Collections.emptySet(),  uniqueKeysOnDuplicateShards);
+          actualUniqueKeys.addAll(uniqueKeysInShard);
+        }
+        
+        // foreach uniqueKey, extract it's route prefix and confirm those aren't spread across multiple shards
+        for (String uniqueKey : uniqueKeysInShard) {
+          final String routePrefix = uniqueKey.substring(0, uniqueKey.lastIndexOf('!'));
+          log.debug("shard( {} ) : uniqueKey( {} ) -> routePrefix( {} )", shardId, uniqueKey, routePrefix);
+          assertNotNull("null prefix WTF? " + uniqueKey, routePrefix);
+          
+          final String otherShard = routePrefixMap.put(routePrefix, shardId);
+          if (null != otherShard)
+            // if we already had a mapping, make sure it's an earlier doc from our current shard...
+            assertEquals("routePrefix " + routePrefix + " found in multiple shards",
+                         shardId, otherShard);
+        }
+      }
 
+      assertEquals("Docs missing?", expectedUniqueKeys.size(), actualUniqueKeys.size());
+      
       testFinished = true;
     } finally {
       if (!testFinished) {
@@ -83,82 +133,21 @@ public class TriLevelCompositeIdRoutingTest extends ShardRoutingTest {
       }
     }
   }
-
-  private void doTriLevelHashingTest() throws Exception {
-    log.info("### STARTING doTriLevelHashingTest");
-    // for now,  we know how ranges will be distributed to shards.
-    // may have to look it up in clusterstate if that assumption changes.
-
-    for (int i = 0; i < NUM_DOCS; i++) {
-      int appId = r.nextInt(NUM_APPS) + 1;
-      int userId = r.nextInt(NUM_USERS) + 1;
-
-      String id = "app" + appId + "!" + "user" + userId + "!" + "doc" + r.nextInt(100);
-      doAddDoc(id);
-
-    }
-
-    commit();
-
-    HashMap<String, Integer> idMap = new HashMap<>();
-
-    for (int i = 1; i <= sliceCount; i++) {
-
-      Set<String> ids = doQueryGetUniqueIdKeys("q", "*:*", "rows", ""+NUM_DOCS, "shards", "shard" + i);
-      for (String id : ids) {
-        assertFalse("Found the same route key [" + id + "] in 2 shards.", idMap.containsKey(id));
-        idMap.put(getKey(id), i);
-      }
-    }
-
-  }
-
-
-  private void doTriLevelHashingTestWithBitMask() throws Exception {
-    log.info("### STARTING doTriLevelHashingTestWithBitMask");
-    // for now,  we know how ranges will be distributed to shards.
-    // may have to look it up in clusterstate if that assumption changes.
-
-    for (int i = 0; i < NUM_DOCS; i++) {
-      int appId = r.nextInt(NUM_APPS) + 1;
-      int userId = r.nextInt(NUM_USERS) + 1;
-      int bitMask = r.nextInt(16) + 1;
-
-      String id = "app" + appId + "/" + bitMask + "!" + "user" + userId + "!" + "doc" + r.nextInt(100);
-      doAddDoc(id);
-
-    }
-
-    commit();
-
-    HashMap<String, Integer> idMap = new HashMap<>();
-
-    for (int i = 1; i <= sliceCount; i++) {
-
-      Set<String> ids = doQueryGetUniqueIdKeys("q", "*:*", "rows", ""+NUM_DOCS, "shards", "shard" + i);
-      for (String id : ids) {
-        assertFalse("Found the same route key [" + id + "] in 2 shards.", idMap.containsKey(id));
-        idMap.put(getKey(id), i);
-      }
-    }
-
-  }
-
+  
   void doAddDoc(String id) throws Exception {
     index("id", id);
     // todo - target diff servers and use cloud clients as well as non-cloud clients
   }
 
-  Set<String> doQueryGetUniqueIdKeys(String... queryParams) throws Exception {
-    QueryResponse rsp = cloudClient.query(params(queryParams));
-    Set<String> obtainedIdKeys = new HashSet<>();
+  private Set<String> fetchUniqueKeysFromShard(final String shardId) throws Exception {
+    // NUM_ADDS is an absolute upper bound on the num docs in the index
+    QueryResponse rsp = cloudClient.query(params("q", "*:*", "rows", ""+NUM_ADDS, "shards", shardId));
+    Set<String> uniqueKeys = new HashSet<>();
     for (SolrDocument doc : rsp.getResults()) {
-      obtainedIdKeys.add(getKey((String) doc.get("id")));
+      final String id = (String) doc.get("id");
+      assertNotNull("null id WTF? " + doc.toString(), id);
+      uniqueKeys.add(id);
     }
-    return obtainedIdKeys;
-  }
-
-  private String getKey(String id) {
-    return id.substring(0, id.lastIndexOf('!'));
+    return uniqueKeys;
   }
 }