You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/08/21 01:30:53 UTC
[26/50] [abbrv] lucene-solr:jira/http2: SOLR-12607: Minor refactorings
SOLR-12607: Minor refactorings
Replaced a few private instances with lambdas and extracted common code for retrying splits into a new method
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/57b33c19
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/57b33c19
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/57b33c19
Branch: refs/heads/jira/http2
Commit: 57b33c19a4a8d8fe675c190fd72144c317ed43be
Parents: 94ecb06
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu Aug 16 16:07:05 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu Aug 16 16:07:05 2018 +0530
----------------------------------------------------------------------
.../cloud/api/collections/ShardSplitTest.java | 249 ++++++++-----------
1 file changed, 110 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/57b33c19/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 2b48fbe..6619ee5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -49,7 +49,6 @@ import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@@ -80,8 +79,8 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String SHARD1_0 = SHARD1 + "_0";
- public static final String SHARD1_1 = SHARD1 + "_1";
+ private static final String SHARD1_0 = SHARD1 + "_0";
+ private static final String SHARD1_1 = SHARD1 + "_1";
public ShardSplitTest() {
schemaString = "schema15.xml"; // we need a string id
@@ -161,21 +160,18 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(collectionName, true);
// let's wait to see parent shard become inactive
CountDownLatch latch = new CountDownLatch(1);
- client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
- @Override
- public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
- Slice parent = collectionState.getSlice(SHARD1);
- Slice slice10 = collectionState.getSlice(SHARD1_0);
- Slice slice11 = collectionState.getSlice(SHARD1_1);
- if (slice10 != null && slice11 != null &&
- parent.getState() == Slice.State.INACTIVE &&
- slice10.getState() == Slice.State.ACTIVE &&
- slice11.getState() == Slice.State.ACTIVE) {
- latch.countDown();
- return true; // removes the watch
- }
- return false;
+ client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
+ Slice parent = collectionState.getSlice(SHARD1);
+ Slice slice10 = collectionState.getSlice(SHARD1_0);
+ Slice slice11 = collectionState.getSlice(SHARD1_1);
+ if (slice10 != null && slice11 != null &&
+ parent.getState() == Slice.State.INACTIVE &&
+ slice10.getState() == Slice.State.ACTIVE &&
+ slice11.getState() == Slice.State.ACTIVE) {
+ latch.countDown();
+ return true; // removes the watch
}
+ return false;
});
latch.await(1, TimeUnit.MINUTES);
if (latch.getCount() != 0) {
@@ -211,22 +207,19 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
}
if (state == RequestStatusState.COMPLETED) {
CountDownLatch newReplicaLatch = new CountDownLatch(1);
- client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
- @Override
- public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
- if (liveNodes.size() != liveNodeCount) {
- return false;
- }
- Slice slice = collectionState.getSlice(SHARD1_0);
- if (slice.getReplicas().size() == 2) {
- if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
- // we see replicas and none of them are recovering
- newReplicaLatch.countDown();
- return true;
- }
- }
+ client.getZkStateReader().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
+ if (liveNodes.size() != liveNodeCount) {
return false;
}
+ Slice slice = collectionState.getSlice(SHARD1_0);
+ if (slice.getReplicas().size() == 2) {
+ if (slice.getReplicas().stream().noneMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+ // we see replicas and none of them are recovering
+ newReplicaLatch.countDown();
+ return true;
+ }
+ }
+ return false;
});
newReplicaLatch.await(30, TimeUnit.SECONDS);
// check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
@@ -422,41 +415,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
AtomicBoolean stop = new AtomicBoolean();
AtomicBoolean killed = new AtomicBoolean(false);
- Runnable monkey = new Runnable() {
- @Override
- public void run() {
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
- zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
- @Override
- public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
- if (stop.get()) {
- return true; // abort and remove the watch
- }
- Slice slice = collectionState.getSlice(SHARD1_0);
- if (slice != null && slice.getReplicas().size() > 1) {
- // ensure that only one watcher invocation thread can kill!
- if (killed.compareAndSet(false, true)) {
- log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
- CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
- try {
- Thread.sleep(1000 + random().nextInt(500));
- ChaosMonkey.kill(cjetty);
- stop.set(true);
- return true;
- } catch (Exception e) {
- log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
- }
- }
+ Runnable monkey = () -> {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ zkStateReader.registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
+ if (stop.get()) {
+ return true; // abort and remove the watch
+ }
+ Slice slice = collectionState.getSlice(SHARD1_0);
+ if (slice != null && slice.getReplicas().size() > 1) {
+ // ensure that only one watcher invocation thread can kill!
+ if (killed.compareAndSet(false, true)) {
+ log.info("Monkey thread found 2 replicas for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
+ CloudJettyRunner cjetty = shardToLeaderJetty.get(SHARD1);
+ try {
+ Thread.sleep(1000 + random().nextInt(500));
+ ChaosMonkey.kill(cjetty);
+ stop.set(true);
+ return true;
+ } catch (Exception e) {
+ log.error("Monkey unable to kill jetty at port " + cjetty.jetty.getLocalPort(), e);
}
- log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
- return false;
}
- });
- }
+ }
+ log.info("Monkey thread found only one replica for {} {}", AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1);
+ return false;
+ });
};
- Thread monkeyThread = null;
- monkeyThread = new Thread(monkey);
+ Thread monkeyThread = new Thread(monkey);
monkeyThread.start();
try {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
@@ -497,29 +483,26 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
waitForRecoveriesToFinish(AbstractDistribZkTestBase.DEFAULT_COLLECTION, true);
// let's wait for the overseer to switch shard states
CountDownLatch latch = new CountDownLatch(1);
- cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, new CollectionStateWatcher() {
- @Override
- public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
- Slice parent = collectionState.getSlice(SHARD1);
- Slice slice10 = collectionState.getSlice(SHARD1_0);
- Slice slice11 = collectionState.getSlice(SHARD1_1);
- if (slice10 != null && slice11 != null &&
- parent.getState() == Slice.State.INACTIVE &&
- slice10.getState() == Slice.State.ACTIVE &&
- slice11.getState() == Slice.State.ACTIVE) {
- areSubShardsActive.set(true);
- latch.countDown();
- return true; // removes the watch
- } else if (slice10 != null && slice11 != null &&
- parent.getState() == Slice.State.ACTIVE &&
- slice10.getState() == Slice.State.RECOVERY_FAILED &&
- slice11.getState() == Slice.State.RECOVERY_FAILED) {
- areSubShardsActive.set(false);
- latch.countDown();
- return true;
- }
- return false;
+ cloudClient.getZkStateReader().registerCollectionStateWatcher(AbstractDistribZkTestBase.DEFAULT_COLLECTION, (liveNodes, collectionState) -> {
+ Slice parent = collectionState.getSlice(SHARD1);
+ Slice slice10 = collectionState.getSlice(SHARD1_0);
+ Slice slice11 = collectionState.getSlice(SHARD1_1);
+ if (slice10 != null && slice11 != null &&
+ parent.getState() == Slice.State.INACTIVE &&
+ slice10.getState() == Slice.State.ACTIVE &&
+ slice11.getState() == Slice.State.ACTIVE) {
+ areSubShardsActive.set(true);
+ latch.countDown();
+ return true; // removes the watch
+ } else if (slice10 != null && slice11 != null &&
+ parent.getState() == Slice.State.ACTIVE &&
+ slice10.getState() == Slice.State.RECOVERY_FAILED &&
+ slice11.getState() == Slice.State.RECOVERY_FAILED) {
+ areSubShardsActive.set(false);
+ latch.countDown();
+ return true;
}
+ return false;
});
latch.await(2, TimeUnit.MINUTES);
@@ -660,37 +643,34 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
}
commit();
- Thread indexThread = new Thread() {
- @Override
- public void run() {
- Random random = random();
- int max = atLeast(random, 401);
- int sleep = atLeast(random, 25);
- log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms");
- Set<String> deleted = new HashSet<>();
- for (int id = 101; id < max; id++) {
- try {
- indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
- Thread.sleep(sleep);
- if (usually(random)) {
- String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
- if (deleted.contains(delId)) continue;
- try {
- deleteAndUpdateCount(router, ranges, docCounts, delId);
- deleted.add(delId);
- documentIds.remove(String.valueOf(delId));
- } catch (Exception e) {
- log.error("Exception while deleting docs", e);
- }
+ Thread indexThread = new Thread(() -> {
+ Random random = random();
+ int max = atLeast(random, 401);
+ int sleep = atLeast(random, 25);
+ log.info("SHARDSPLITTEST: Going to add " + max + " number of docs at 1 doc per " + sleep + "ms");
+ Set<String> deleted = new HashSet<>();
+ for (int id = 101; id < max; id++) {
+ try {
+ indexAndUpdateCount(router, ranges, docCounts, String.valueOf(id), id, documentIds);
+ Thread.sleep(sleep);
+ if (usually(random)) {
+ String delId = String.valueOf(random.nextInt(id - 101 + 1) + 101);
+ if (deleted.contains(delId)) continue;
+ try {
+ deleteAndUpdateCount(router, ranges, docCounts, delId);
+ deleted.add(delId);
+ documentIds.remove(String.valueOf(delId));
+ } catch (Exception e) {
+ log.error("Exception while deleting docs", e);
}
- } catch (Exception e) {
- log.error("Exception while adding doc id = " + id, e);
- // do not select this id for deletion ever
- deleted.add(String.valueOf(id));
}
+ } catch (Exception e) {
+ log.error("Exception while adding doc id = " + id, e);
+ // do not select this id for deletion ever
+ deleted.add(String.valueOf(id));
}
}
- };
+ });
indexThread.start();
try {
@@ -776,20 +756,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
collectionClient.commit();
- for (int i = 0; i < 3; i++) {
- try {
- splitShard(collectionName, SHARD1, null, null, false);
- break;
- } catch (HttpSolrClient.RemoteSolrException e) {
- if (e.code() != 500) {
- throw e;
- }
- log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
- if (i == 2) {
- fail("SPLITSHARD was not successful even after three tries");
- }
- }
- }
+ trySplit(collectionName, null, SHARD1, 3);
waitForRecoveriesToFinish(collectionName, false);
@@ -858,20 +825,7 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
collectionClient.commit();
- for (int i = 0; i < 3; i++) {
- try {
- splitShard(collectionName, null, null, splitKey, false);
- break;
- } catch (HttpSolrClient.RemoteSolrException e) {
- if (e.code() != 500) {
- throw e;
- }
- log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
- if (i == 2) {
- fail("SPLITSHARD was not successful even after three tries");
- }
- }
- }
+ trySplit(collectionName, splitKey, null, 3);
waitForRecoveriesToFinish(collectionName, false);
SolrQuery solrQuery = new SolrQuery("*:*");
@@ -886,6 +840,23 @@ public class ShardSplitTest extends AbstractFullDistribZkTestBase {
}
}
+ private void trySplit(String collectionName, String splitKey, String shardId, int maxTries) throws SolrServerException, IOException {
+ for (int i = 0; i < maxTries; i++) {
+ try {
+ splitShard(collectionName, shardId, null, splitKey, false);
+ break;
+ } catch (HttpSolrClient.RemoteSolrException e) {
+ if (e.code() != 500) {
+ throw e;
+ }
+ log.error("SPLITSHARD failed. " + (i < maxTries - 1 ? " Retring split" : ""), e);
+ if (i == 2) {
+ fail("SPLITSHARD was not successful even after three tries");
+ }
+ }
+ }
+ }
+
protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas, Set<String> documentIds) throws Exception {
ClusterState clusterState = null;
Slice slice1_0 = null, slice1_1 = null;