You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/08/21 01:30:53 UTC

[26/50] [abbrv] lucene-solr:jira/http2: SOLR-12607: Minor refactorings

SOLR-12607: Minor refactorings

Replaced a few private instances with lambdas and extracted common code for retrying splits into a new method


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/57b33c19
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/57b33c19
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/57b33c19

Branch: refs/heads/jira/http2
Commit: 57b33c19a4a8d8fe675c190fd72144c317ed43be
Parents: 94ecb06
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu Aug 16 16:07:05 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu Aug 16 16:07:05 2018 +0530

----------------------------------------------------------------------
 .../cloud/api/collections/ShardSplitTest.java   | 249 ++++++++-----------
 1 file changed, 110 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57b33c19/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 2b48fbe..6619ee5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -49,7 +49,6 @@ import org.apache.solr.cloud.ChaosMonkey;
 import org.apache.solr.cloud.StoppableIndexingThread;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
@@ -80,8 +79,8 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final String SHARD1_0 = SHARD1 + "_0";
-  public static final String SHARD1_1 = SHARD1 + "_1";
+  private static final String SHARD1_0 = SHARD1 + "_0";
+  private static final String SHARD1_1 = SHARD1 + "_1";
 
   public ShardSplitTest() {
     schemaString = "schema15.xml";      // we need a string id
@@ -161,21 +160,18 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
           waitForRecoveriesToFinish(collectionName, true);
           // let's wait to see parent shard become inactive
           CountDownLatch latch = new CountDownLatch(1);
-          client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
-            @Override
-            public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
-              Slice parent = collectionState.getSlice(SHARD1);
-              Slice slice10 = collectionState.getSlice(SHARD1_0);
-              Slice slice11 = collectionState.getSlice(SHARD1_1);
-              if (slice10 != null && slice11 != null &&
-                  parent.getState() == Slice.State.INACTIVE &&
-                  slice10.getState() == Slice.State.ACTIVE &&
-                  slice11.getState() == Slice.State.ACTIVE) {
-                latch.countDown();
-                return true; // removes the watch
-              }
-              return false;
+          client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
+            Slice parent = collectionState.getSlice(SHARD1);
+            Slice slice10 = collectionState.getSlice(SHARD1_0);
+            Slice slice11 = collectionState.getSlice(SHARD1_1);
+            if (slice10 != null && slice11 != null &&
+                parent.getState() == Slice.State.INACTIVE &&
+                slice10.getState() == Slice.State.ACTIVE &&
+                slice11.getState() == Slice.State.ACTIVE) {
+              latch.countDown();
+              return true; // removes the watch
             }
+            return false;
           });
           latch.await(1, TimeUnit.MINUTES);
           if (latch.getCount() != 0)  {
@@ -211,22 +207,19 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
           }
           if (state == RequestStatusState.COMPLETED)  {
             CountDownLatch newReplicaLatch = new CountDownLatch(1);
-            client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
-              @Override
-              public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
-                if (liveNodes.size() != liveNodeCount)  {
-                  return false;
-                }
-                Slice slice = collectionState.getSlice(SHARD1_0);
-                if (slice.getReplicas().size() == 2)  {
-                  if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
-                    // we see replicas and none of them are recovering
-                    newReplicaLatch.countDown();
-                    return true;
-                  }
-                }
+            client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
+              if (liveNodes.size() != liveNodeCount)  {
                 return false;
               }
+              Slice slice = collectionState.getSlice(SHARD1_0);
+              if (slice.getReplicas().size() == 2)  {
+                if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+                  // we see replicas and none of them are recovering
+                  newReplicaLatch.countDown();
+                  return true;
+                }
+              }
+              return false;
             });
             newReplicaLatch.await(30, TimeUnit.SECONDS);
             // check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
@@ -422,41 +415,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
 
     AtomicBoolean stop = new AtomicBoolean();
     AtomicBoolean killed = new AtomicBoolean(false);
-    Runnable monkey = new Runnable() {
-      @Override
-      public void run() {
-        ZkStateReader zkStateReader = cloudClient.getZkStateReader();
-        zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
-          @Override
-          public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
-            if (stop.get()) {
-              return true; // abort and remove the watch
-            }
-            Slice slice = collectionState.getSlice(SHARD1_0);
-            if (slice != null && slice.getReplicas().size() > 1) {
-              // ensure that only one watcher invocation thread can kill!
-              if (killed.compareAndSet(false, true))  {
-                log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
-                CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
-                try {
-                  Thread.sleep(1000 + random().nextInt(500));
-                  ChaosMonkey.kill(cjetty);
-                  stop.set(true);
-                  return true;
-                } catch (Exception e) {
-                  log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
-                }
-              }
+    Runnable monkey = () -> {
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
+        if (stop.get()) {
+          return true; // abort and remove the watch
+        }
+        Slice slice = collectionState.getSlice(SHARD1_0);
+        if (slice != null && slice.getReplicas().size() > 1) {
+          // ensure that only one watcher invocation thread can kill!
+          if (killed.compareAndSet(false, true))  {
+            log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
+            CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
+            try {
+              Thread.sleep(1000 + random().nextInt(500));
+              ChaosMonkey.kill(cjetty);
+              stop.set(true);
+              return true;
+            } catch (Exception e) {
+              log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
             }
-            log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
-            return false;
           }
-        });
-      }
+        }
+        log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
+        return false;
+      });
     };
 
-    Thread monkeyThread = null;
-    monkeyThread = new Thread(monkey);
+    Thread monkeyThread = new Thread(monkey);
     monkeyThread.start();
     try {
       CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
@@ -497,29 +483,26 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
         waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
         // let's wait for the overseer to switch shard states
         CountDownLatch latch = new CountDownLatch(1);
-        cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
-          @Override
-          public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
-            Slice parent = collectionState.getSlice(SHARD1);
-            Slice slice10 = collectionState.getSlice(SHARD1_0);
-            Slice slice11 = collectionState.getSlice(SHARD1_1);
-            if (slice10 != null && slice11 != null &&
-                parent.getState() == Slice.State.INACTIVE &&
-                slice10.getState() == Slice.State.ACTIVE &&
-                slice11.getState() == Slice.State.ACTIVE) {
-              areSubShardsActive.set(true);
-              latch.countDown();
-              return true; // removes the watch
-            } else if (slice10 != null && slice11 != null &&
-                parent.getState() == Slice.State.ACTIVE &&
-                slice10.getState() == Slice.State.RECOVERY_FAILED &&
-                slice11.getState() == Slice.State.RECOVERY_FAILED) {
-              areSubShardsActive.set(false);
-              latch.countDown();
-              return true;
-            }
-            return false;
+        cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
+          Slice parent = collectionState.getSlice(SHARD1);
+          Slice slice10 = collectionState.getSlice(SHARD1_0);
+          Slice slice11 = collectionState.getSlice(SHARD1_1);
+          if (slice10 != null && slice11 != null &&
+              parent.getState() == Slice.State.INACTIVE &&
+              slice10.getState() == Slice.State.ACTIVE &&
+              slice11.getState() == Slice.State.ACTIVE) {
+            areSubShardsActive.set(true);
+            latch.countDown();
+            return true; // removes the watch
+          } else if (slice10 != null && slice11 != null &&
+              parent.getState() == Slice.State.ACTIVE &&
+              slice10.getState() == Slice.State.RECOVERY_FAILED &&
+              slice11.getState() == Slice.State.RECOVERY_FAILED) {
+            areSubShardsActive.set(false);
+            latch.countDown();
+            return true;
           }
+          return false;
         });
 
         latch.await(2, TimeUnit.MINUTES);
@@ -660,37 +643,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
     }
     commit();
 
-    Thread indexThread = new Thread() {
-      @Override
-      public void run() {
-        Random random = random();
-        int max = atLeast(random, 401);
-        int sleep = atLeast(random, 25);
-        log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms");
-        Set<String> deleted = new HashSet<>();
-        for (int id = 101; id < max; id++) {
-          try {
-            indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
-            Thread.sleep(sleep);
-            if (usually(random))  {
-              String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
-              if (deleted.contains(delId))  continue;
-              try {
-                deleteAndUpdateCount(router, ranges, docCounts, delId);
-                deleted.add(delId);
-                documentIds.remove(String.valueOf(delId));
-              } catch (Exception e) {
-                log.error("Exception while deleting docs", e);
-              }
+    Thread indexThread = new Thread(() -> {
+      Random random = random();
+      int max = atLeast(random, 401);
+      int sleep = atLeast(random, 25);
+      log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms");
+      Set<String> deleted = new HashSet<>();
+      for (int id = 101; id < max; id++) {
+        try {
+          indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
+          Thread.sleep(sleep);
+          if (usually(random))  {
+            String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
+            if (deleted.contains(delId))  continue;
+            try {
+              deleteAndUpdateCount(router, ranges, docCounts, delId);
+              deleted.add(delId);
+              documentIds.remove(String.valueOf(delId));
+            } catch (Exception e) {
+              log.error("Exception while deleting docs", e);
             }
-          } catch (Exception e) {
-            log.error("Exception while adding doc id = " + id, e);
-            // do not select this id for deletion ever
-            deleted.add(String.valueOf(id));
           }
+        } catch (Exception e) {
+          log.error("Exception while adding doc id = " + id, e);
+          // do not select this id for deletion ever
+          deleted.add(String.valueOf(id));
         }
       }
-    };
+    });
     indexThread.start();
 
     try {
@@ -776,20 +756,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
 
       collectionClient.commit();
 
-      for (int i = 0; i < 3; i++) {
-        try {
-          splitShard(collectionName, SHARD1, null, null, false);
-          break;
-        } catch (HttpSolrClient.RemoteSolrException e) {
-          if (e.code() != 500) {
-            throw e;
-          }
-          log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
-          if (i == 2) {
-            fail("SPLITSHARD was not successful even after three tries");
-          }
-        }
-      }
+      trySplit(collectionName, null, SHARD1, 3);
 
       waitForRecoveriesToFinish(collectionName, false);
 
@@ -858,20 +825,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
 
       collectionClient.commit();
 
-      for (int i = 0; i < 3; i++) {
-        try {
-          splitShard(collectionName, null, null, splitKey, false);
-          break;
-        } catch (HttpSolrClient.RemoteSolrException e) {
-          if (e.code() != 500) {
-            throw e;
-          }
-          log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
-          if (i == 2) {
-            fail("SPLITSHARD was not successful even after three tries");
-          }
-        }
-      }
+      trySplit(collectionName, splitKey, null, 3);
 
       waitForRecoveriesToFinish(collectionName, false);
       SolrQuery solrQuery = new SolrQuery("*:*");
@@ -886,6 +840,23 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
     }
   }
 
+  private void trySplit(String collectionName, String splitKey, String shardId, int maxTries) throws SolrServerException, IOException {
+    for (int i = 0; i < maxTries; i++) {
+      try {
+        splitShard(collectionName, shardId, null, splitKey, false);
+        break;
+      } catch (HttpSolrClient.RemoteSolrException e) {
+        if (e.code() != 500) {
+          throw e;
+        }
+        log.error("SPLITSHARD failed. " + (i < maxTries - 1 ? " Retring split" : ""), e);
+        if (i == 2) {
+          fail("SPLITSHARD was not successful even after three tries");
+        }
+      }
+    }
+  }
+
   protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception {
     ClusterState clusterState = null;
     Slice slice1_0 = null, slice1_1 = null;