You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/30 18:10:29 UTC
[02/17] lucene-solr:branch_7x: SOLR-12801: Make massive improvements
to the tests.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb652b84/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index 5f0e596..444649d 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -19,6 +19,8 @@ package org.apache.solr.cloud;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
@@ -50,7 +52,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
private static final String ZK_HOST = "zkHost";
private static final String ZOOKEEPER_FORCE_SYNC = "zookeeper.forceSync";
protected static final String DEFAULT_COLLECTION = "collection1";
- protected ZkTestServer zkServer;
+ protected volatile ZkTestServer zkServer;
private AtomicInteger homeCount = new AtomicInteger();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -78,7 +80,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
String schema = getCloudSchemaFile();
if (schema == null) schema = "schema.xml";
- AbstractZkTestCase.buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), getCloudSolrConfig(), schema);
+ zkServer.buildZooKeeper(getCloudSolrConfig(), schema);
// set some system properties for use by tests
System.setProperty("solr.test.sys.prop1", "propone");
@@ -101,12 +103,18 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
setupJettySolrHome(controlHome);
controlJetty = createJetty(controlHome, null); // let the shardId default to shard1
+ controlJetty.start();
controlClient = createNewSolrClient(controlJetty.getLocalPort());
assertTrue(CollectionAdminRequest
.createCollection("control_collection", 1, 1)
.setCreateNodeSet(controlJetty.getNodeName())
.process(controlClient).isSuccess());
+
+ ZkStateReader zkStateReader = jettys.get(0).getCoreContainer().getZkController()
+ .getZkStateReader();
+
+ waitForRecoveriesToFinish("control_collection", zkStateReader, false, true, 15);
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= numShards; i++) {
@@ -115,19 +123,14 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
File jettyHome = new File(new File(getSolrHome()).getParentFile(), "jetty" + homeCount.incrementAndGet());
setupJettySolrHome(jettyHome);
JettySolrRunner j = createJetty(jettyHome, null, "shard" + (i + 2));
+ j.start();
jettys.add(j);
clients.add(createNewSolrClient(j.getLocalPort()));
sb.append(buildUrl(j.getLocalPort()));
}
shards = sb.toString();
-
- // now wait till we see the leader for each shard
- for (int i = 1; i <= numShards; i++) {
- ZkStateReader zkStateReader = jettys.get(0).getCoreContainer().getZkController()
- .getZkStateReader();
- zkStateReader.getLeaderRetry("collection1", "shard" + (i + 2), 15000);
- }
+
}
protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose)
@@ -141,89 +144,71 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
}
public static void waitForRecoveriesToFinish(String collection,
- ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
+ ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, long timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
- boolean cont = true;
- int cnt = 0;
-
- while (cont) {
- if (verbose) System.out.println("-");
- boolean sawLiveRecovering = false;
- ClusterState clusterState = zkStateReader.getClusterState();
- final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
- assertNotNull("Could not find collection:" + collection, docCollection);
- Map<String,Slice> slices = docCollection.getSlicesMap();
- assertNotNull("Could not find collection:" + collection, slices);
- for (Map.Entry<String,Slice> entry : slices.entrySet()) {
- Slice slice = entry.getValue();
- if (slice.getState() == Slice.State.CONSTRUCTION) { // similar to replica recovering; pretend its the same thing
- if (verbose) System.out.println("Found a slice in construction state; will wait.");
- sawLiveRecovering = true;
- }
- Map<String,Replica> shards = slice.getReplicasMap();
- for (Map.Entry<String,Replica> shard : shards.entrySet()) {
- if (verbose) System.out.println("replica:" + shard.getValue().getName() + " rstate:"
- + shard.getValue().getStr(ZkStateReader.STATE_PROP)
- + " live:"
- + clusterState.liveNodesContain(shard.getValue().getNodeName()));
- final Replica.State state = shard.getValue().getState();
- if ((state == Replica.State.RECOVERING || state == Replica.State.DOWN || state == Replica.State.RECOVERY_FAILED)
- && clusterState.liveNodesContain(shard.getValue().getStr(ZkStateReader.NODE_NAME_PROP))) {
+ try {
+ zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+ if (docCollection == null)
+ return false;
+ boolean sawLiveRecovering = false;
+
+ assertNotNull("Could not find collection:" + collection, docCollection);
+ Map<String,Slice> slices = docCollection.getSlicesMap();
+ assertNotNull("Could not find collection:" + collection, slices);
+ for (Map.Entry<String,Slice> entry : slices.entrySet()) {
+ Slice slice = entry.getValue();
+ if (slice.getState() == Slice.State.CONSTRUCTION) { // similar to replica recovering; pretend its the same
+ // thing
+ if (verbose) System.out.println("Found a slice in construction state; will wait.");
sawLiveRecovering = true;
}
+ Map<String,Replica> shards = slice.getReplicasMap();
+ for (Map.Entry<String,Replica> shard : shards.entrySet()) {
+ if (verbose) System.out.println("replica:" + shard.getValue().getName() + " rstate:"
+ + shard.getValue().getStr(ZkStateReader.STATE_PROP)
+ + " live:"
+ + liveNodes.contains(shard.getValue().getNodeName()));
+ final Replica.State state = shard.getValue().getState();
+ if ((state == Replica.State.RECOVERING || state == Replica.State.DOWN
+ || state == Replica.State.RECOVERY_FAILED)
+ && liveNodes.contains(shard.getValue().getStr(ZkStateReader.NODE_NAME_PROP))) {
+ return false;
+ }
+ }
}
- }
- if (!sawLiveRecovering || cnt == timeoutSeconds) {
if (!sawLiveRecovering) {
- if (verbose) System.out.println("no one is recoverying");
- } else {
- if (verbose) System.out.println("Gave up waiting for recovery to finish..");
- if (failOnTimeout) {
- Diagnostics.logThreadDumps("Gave up waiting for recovery to finish. THREAD DUMP:");
- zkStateReader.getZkClient().printLayoutToStdOut();
- fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
- // won't get here
- return;
+ if (!sawLiveRecovering) {
+ if (verbose) System.out.println("no one is recoverying");
+ } else {
+ if (verbose) System.out.println("Gave up waiting for recovery to finish..");
+ return false;
}
+ return true;
+ } else {
+ return false;
}
- cont = false;
- } else {
- Thread.sleep(1000);
- }
- cnt++;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ Diagnostics.logThreadDumps("Gave up waiting for recovery to finish. THREAD DUMP:");
+ zkStateReader.getZkClient().printLayoutToStdOut();
+ fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
}
log.info("Recoveries finished - collection: " + collection);
}
+
public static void waitForCollectionToDisappear(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
- boolean cont = true;
- int cnt = 0;
-
- while (cont) {
- if (verbose) System.out.println("-");
- ClusterState clusterState = zkStateReader.getClusterState();
- if (!clusterState.hasCollection(collection)) break;
- if (cnt == timeoutSeconds) {
- if (verbose) System.out.println("Gave up waiting for "+collection+" to disappear..");
- if (failOnTimeout) {
- Diagnostics.logThreadDumps("Gave up waiting for "+collection+" to disappear. THREAD DUMP:");
- zkStateReader.getZkClient().printLayoutToStdOut();
- fail("The collection ("+collection+") is still present - waited for " + timeoutSeconds + " seconds");
- // won't get here
- return;
- }
- cont = false;
- } else {
- Thread.sleep(1000);
- }
- cnt++;
- }
+ zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+ if (docCollection == null)
+ return true;
+ return false;
+ });
log.info("Collection has disappeared - collection: " + collection);
}
@@ -250,26 +235,26 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Thread.sleep(100);
}
+
+ zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> {
+ if (docCollection == null)
+ return false;
+
+ Slice slice = docCollection.getSlice(shardName);
+ if (slice != null && slice.getLeader() != null && !slice.getLeader().equals(oldLeader) && slice.getLeader().getState() == Replica.State.ACTIVE) {
+ log.info("Old leader {}, new leader {}. New leader got elected in {} ms", oldLeader, slice.getLeader(), timeOut.timeElapsed(MILLISECONDS) );
+ return true;
+ }
+ return false;
+ });
}
- public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException {
- int maxIterations = 100;
- Replica.State coreState = null;
- while(maxIterations-->0) {
- final DocCollection docCollection = reader.getClusterState().getCollectionOrNull(collection);
- if(docCollection != null && docCollection.getSlice(shard)!=null) {
- Slice slice = docCollection.getSlice(shard);
- Replica replica = slice.getReplicasMap().get(coreNodeName);
- if (replica != null) {
- coreState = replica.getState();
- if(coreState == expectedState) {
- return;
- }
- }
- }
- Thread.sleep(50);
- }
- fail("Illegal state, was: " + coreState + " expected:" + expectedState + " clusterState:" + reader.getClusterState());
+ public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName,
+ Replica.State expectedState) throws InterruptedException, TimeoutException {
+ reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
+ (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
+ && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
+ && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
}
protected static void assertAllActive(String collection, ZkStateReader zkStateReader)
@@ -300,22 +285,28 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
@Override
public void distribTearDown() throws Exception {
- System.clearProperty(ZK_HOST);
- System.clearProperty("collection");
- System.clearProperty(ENABLE_UPDATE_LOG);
- System.clearProperty(REMOVE_VERSION_FIELD);
- System.clearProperty("solr.directoryFactory");
- System.clearProperty("solr.test.sys.prop1");
- System.clearProperty("solr.test.sys.prop2");
- System.clearProperty(ZOOKEEPER_FORCE_SYNC);
- System.clearProperty(MockDirectoryFactory.SOLR_TESTS_ALLOW_READING_FILES_STILL_OPEN_FOR_WRITE);
-
resetExceptionIgnores();
+
try {
- super.distribTearDown();
- }
- finally {
zkServer.shutdown();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception shutting down Zk Test Server.", e);
+ } finally {
+ try {
+ super.distribTearDown();
+ } finally {
+ System.clearProperty(ZK_HOST);
+ System.clearProperty("collection");
+ System.clearProperty(ENABLE_UPDATE_LOG);
+ System.clearProperty(REMOVE_VERSION_FIELD);
+ System.clearProperty("solr.directoryFactory");
+ System.clearProperty("solr.test.sys.prop1");
+ System.clearProperty("solr.test.sys.prop2");
+ System.clearProperty(ZOOKEEPER_FORCE_SYNC);
+ System.clearProperty(MockDirectoryFactory.SOLR_TESTS_ALLOW_READING_FILES_STILL_OPEN_FOR_WRITE);
+
+ }
+
}
}
@@ -331,6 +322,6 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
// disconnect enough to test stalling, if things stall, then clientSoTimeout w""ill be hit
Thread.sleep(pauseMillis);
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
- zkServer.run();
+ zkServer.run(false);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb652b84/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 9d0e4bf..2fdb4b1 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -16,11 +16,12 @@
*/
package org.apache.solr.cloud;
+import static org.apache.solr.common.util.Utils.makeMap;
+
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
-import java.net.URL;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,7 +36,10 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
@@ -44,6 +48,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -56,10 +61,12 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -72,6 +79,8 @@ import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.TimeSource;
@@ -91,14 +100,13 @@ import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.noggit.CharArr;
import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.util.Utils.makeMap;
-
/**
* TODO: we should still test this works as a custom update chain as well as
* what we test now - the default update chain
@@ -109,6 +117,12 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
@BeforeClass
public static void beforeFullSolrCloudTest() {
+
+ }
+
+ @Before
+ public void beforeTest() {
+ cloudInit = false;
}
public static final String SHARD1 = "shard1";
@@ -124,22 +138,20 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
String missingField = "ignore_exception__missing_but_valid_field_t";
protected int sliceCount;
- protected CloudSolrClient controlClientCloud; // cloud version of the control client
+ protected volatile CloudSolrClient controlClientCloud; // cloud version of the control client
protected volatile CloudSolrClient cloudClient;
- protected List<SolrClient> coreClients = new ArrayList<>();
+ protected final List<SolrClient> coreClients = Collections.synchronizedList(new ArrayList<>());
- protected List<CloudJettyRunner> cloudJettys = new ArrayList<>();
- protected Map<String,List<CloudJettyRunner>> shardToJetty = new HashMap<>();
+ protected final List<CloudJettyRunner> cloudJettys = Collections.synchronizedList(new ArrayList<>());
+ protected final Map<String,List<CloudJettyRunner>> shardToJetty = new ConcurrentHashMap<>();
private AtomicInteger jettyIntCntr = new AtomicInteger(0);
- protected ChaosMonkey chaosMonkey;
-
- protected Map<String,CloudJettyRunner> shardToLeaderJetty = new HashMap<>();
- private boolean cloudInit;
- protected boolean useJettyDataDir = true;
+ protected volatile ChaosMonkey chaosMonkey;
- private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
+ protected Map<String,CloudJettyRunner> shardToLeaderJetty = new ConcurrentHashMap<>();
+ private static volatile boolean cloudInit;
+ protected volatile boolean useJettyDataDir = true;
- protected Map<URI,SocketProxy> proxies = new HashMap<>();
+ private final List<RestTestHarness> restTestHarnesses = Collections.synchronizedList(new ArrayList<>());
public static class CloudJettyRunner {
public JettySolrRunner jetty;
@@ -232,6 +244,9 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
public void distribSetUp() throws Exception {
super.distribSetUp();
// ignoreException(".*");
+
+ cloudInit = false;
+
if (sliceCount > 0) {
System.setProperty("numShards", Integer.toString(sliceCount));
} else {
@@ -303,24 +318,27 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
protected CloudSolrClient createCloudClient(String defaultCollection) {
- CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean(), 30000, 60000);
+ CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean(), 30000, 120000);
if (defaultCollection != null) client.setDefaultCollection(defaultCollection);
return client;
}
@Override
protected void createServers(int numServers) throws Exception {
-
File controlJettyDir = createTempDir("control").toFile();
setupJettySolrHome(controlJettyDir);
controlJetty = createJetty(controlJettyDir, useJettyDataDir ? getDataDir(testDir
+ "/control/data") : null);
- try (SolrClient client = createCloudClient("control_collection")) {
+ controlJetty.start();
+ try (CloudSolrClient client = createCloudClient("control_collection")) {
assertEquals(0, CollectionAdminRequest
.createCollection("control_collection", "conf1", 1, 1)
.setCreateNodeSet(controlJetty.getNodeName())
.process(client).getStatus());
- }
+ waitForActiveReplicaCount(client, "control_collection", 1);
+ }
+
+
controlClient = new HttpSolrClient.Builder(controlJetty.getBaseUrl() + "/control_collection").build();
if (sliceCount <= 0) {
// for now, just create the cloud client for the control if we don't
@@ -328,8 +346,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// this can change if more tests need it.
controlClientCloud = createCloudClient("control_collection");
controlClientCloud.connect();
- waitForCollection(controlClientCloud.getZkStateReader(),
- "control_collection", 0);
// NOTE: we are skipping creation of the chaos monkey by returning here
cloudClient = controlClientCloud; // temporary - some code needs/uses
// cloudClient
@@ -339,12 +355,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
initCloud();
createJettys(numServers);
-
- int cnt = getTotalReplicas(DEFAULT_COLLECTION);
- if (cnt > 0) {
- waitForCollection(cloudClient.getZkStateReader(), DEFAULT_COLLECTION, sliceCount);
- }
-
+
}
public static void waitForCollection(ZkStateReader reader, String collection, int slices) throws Exception {
@@ -381,8 +392,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
- List<JettySolrRunner> jettys = new ArrayList<>();
- List<SolrClient> clients = new ArrayList<>();
+ List<JettySolrRunner> jettys = Collections.synchronizedList(new ArrayList<>());
+ List<SolrClient> clients = Collections.synchronizedList(new ArrayList<>());
+ List<CollectionAdminRequest> createReplicaRequests = Collections.synchronizedList(new ArrayList<>());
+ List<CollectionAdminRequest> createPullReplicaRequests = Collections.synchronizedList(new ArrayList<>());
StringBuilder sb = new StringBuilder();
assertEquals(0, CollectionAdminRequest
@@ -391,7 +404,15 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.setCreateNodeSet("")
.process(cloudClient).getStatus());
+ cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount);
+
+ ForkJoinPool customThreadPool = new ForkJoinPool(12);
+
int numOtherReplicas = numJettys - getPullReplicaCount() * sliceCount;
+
+ log.info("Creating jetty instances pullReplicaCount={} numOtherReplicas={}", getPullReplicaCount(), numOtherReplicas);
+
+ int addedReplicas = 0;
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
@@ -400,66 +421,126 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
jettyDir.mkdirs();
setupJettySolrHome(jettyDir);
- JettySolrRunner j;
-
- CollectionAdminResponse response;
+ int currentI = i;
if (numOtherReplicas > 0) {
numOtherReplicas--;
if (useTlogReplicas()) {
- log.info("create jetty {} in directory {} of type {}", i, jettyDir, Replica.Type.TLOG);
- j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
- + cnt) : null, null, "solrconfig.xml", null, Replica.Type.TLOG);
- response = CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard"+((i%sliceCount)+1))
- .setNode(j.getNodeName())
- .setType(Replica.Type.TLOG)
- .process(cloudClient);
+ log.info("create jetty {} in directory {} of type {} in shard {}", i, jettyDir, Replica.Type.TLOG, ((currentI % sliceCount) + 1));
+ customThreadPool.submit(() -> Collections.singleton(controlClient).parallelStream().forEach(c -> {
+ try {
+ JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ + cnt) : null, null, "solrconfig.xml", null, Replica.Type.TLOG);
+ j.start();
+ jettys.add(j);
+ waitForLiveNode(j);
+
+ createReplicaRequests.add(CollectionAdminRequest
+ .addReplicaToShard(DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
+ .setNode(j.getNodeName())
+ .setType(Replica.Type.TLOG));
+
+ coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
+ SolrClient client = createNewSolrClient(j.getLocalPort());
+ clients.add(client);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ addedReplicas++;
} else {
- log.info("create jetty {} in directory {} of type {}", i, jettyDir, Replica.Type.NRT);
- j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
- + cnt) : null, null, "solrconfig.xml", null, null);
- response = CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard"+((i%sliceCount)+1))
- .setNode(j.getNodeName())
- .setType(Replica.Type.NRT)
- .process(cloudClient);
+ log.info("create jetty {} in directory {} of type {}", i, jettyDir, Replica.Type.NRT, ((currentI % sliceCount) + 1));
+
+ customThreadPool.submit(() -> Collections.singleton(controlClient).parallelStream().forEach(c -> {
+ try {
+ JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ + cnt) : null, null, "solrconfig.xml", null, null);
+ j.start();
+ jettys.add(j);
+ waitForLiveNode(j);
+ createReplicaRequests.add(CollectionAdminRequest
+ .addReplicaToShard(DEFAULT_COLLECTION, "shard"+((currentI%sliceCount)+1))
+ .setNode(j.getNodeName())
+ .setType(Replica.Type.NRT));
+ coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
+ SolrClient client = createNewSolrClient(j.getLocalPort());
+ clients.add(client);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+
+ addedReplicas++;
}
} else {
- log.info("create jetty {} in directory {} of type {}", i, jettyDir, Replica.Type.PULL);
- j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
- + cnt) : null, null, "solrconfig.xml", null, Replica.Type.PULL);
- response = CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard"+((i%sliceCount)+1))
- .setNode(j.getNodeName())
- .setType(Replica.Type.PULL)
- .process(cloudClient);
+ log.info("create jetty {} in directory {} of type {}", i, jettyDir, Replica.Type.PULL, ((currentI % sliceCount) + 1));
+ customThreadPool.submit(() -> Collections.singleton(controlClient).parallelStream().forEach(c -> {
+ try {
+ JettySolrRunner j = createJetty(jettyDir, useJettyDataDir ? getDataDir(testDir + "/jetty"
+ + cnt) : null, null, "solrconfig.xml", null, Replica.Type.PULL);
+ j.start();
+ jettys.add(j);
+ waitForLiveNode(j);
+ createPullReplicaRequests.add(CollectionAdminRequest
+ .addReplicaToShard(DEFAULT_COLLECTION, "shard"+((currentI%sliceCount)+1))
+ .setNode(j.getNodeName())
+ .setType(Replica.Type.PULL));
+ coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
+ SolrClient client = createNewSolrClient(j.getLocalPort());
+ clients.add(client);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ addedReplicas++;
+ }
+
+ }
+
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
+
+ customThreadPool = new ForkJoinPool(12);
+ customThreadPool.submit(() -> createReplicaRequests.parallelStream().forEach(r -> {
+ CollectionAdminResponse response;
+ try {
+ response = (CollectionAdminResponse) r.process(cloudClient);
+ } catch (SolrServerException | IOException e) {
+ throw new RuntimeException(e);
}
- jettys.add(j);
+
assertTrue(response.isSuccess());
String coreName = response.getCollectionCoresStatus().keySet().iterator().next();
- coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
- SolrClient client = createNewSolrClient(j.getLocalPort());
- clients.add(client);
- }
+ }));
+
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
+
+ customThreadPool = new ForkJoinPool(12);
+ customThreadPool.submit(() -> createPullReplicaRequests.parallelStream().forEach(r -> {
+ CollectionAdminResponse response;
+ try {
+ response = (CollectionAdminResponse) r.process(cloudClient);
+ } catch (SolrServerException | IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ assertTrue(response.isSuccess());
+ String coreName = response.getCollectionCoresStatus().keySet().iterator().next();
+ }));
+
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
+
+ waitForActiveReplicaCount(cloudClient, DEFAULT_COLLECTION, addedReplicas);
this.jettys.addAll(jettys);
this.clients.addAll(clients);
- int numReplicas = getTotalReplicas(DEFAULT_COLLECTION);
- int expectedNumReplicas = numJettys;
-
- // now wait until we see that the number of shards in the cluster state
- // matches what we expect
- int retries = 0;
- while (numReplicas != expectedNumReplicas) {
- numReplicas = getTotalReplicas(DEFAULT_COLLECTION);
- if (numReplicas == expectedNumReplicas) break;
- if (retries++ == 60) {
- printLayoutOnTearDown = true;
- fail("Number of replicas in the state does not match what we set:" + numReplicas + " vs " + expectedNumReplicas);
- }
- Thread.sleep(500);
- }
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
// make sure we have a leader for each shard
@@ -467,7 +548,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + i, 10000);
}
- if (numReplicas > 0) {
+ if (sliceCount > 0) {
updateMappingsFromZk(this.jettys, this.clients);
}
@@ -484,47 +565,48 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
return jettys;
}
+ protected void waitForLiveNode(JettySolrRunner j) throws InterruptedException, TimeoutException {
+ cloudClient.getZkStateReader().waitForLiveNodes(30, TimeUnit.SECONDS, SolrCloudTestCase.containsLiveNode(j.getNodeName()));
+ }
+
+ protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
+ AtomicInteger nReplicas = new AtomicInteger();
+ try {
+ client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> {
+ if (c == null)
+ return false;
+ int numReplicas = getTotalReplicas(c, c.getName());
+ nReplicas.set(numReplicas);
+ if (numReplicas == expectedNumReplicas) return true;
+
+ return false;
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ try {
+ printLayout();
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ throw new NotInClusterStateException(ErrorCode.SERVER_ERROR,
+ "Number of replicas in the state does not match what we set:" + nReplicas + " vs " + expectedNumReplicas);
+ }
+ }
+
protected int getPullReplicaCount() {
return 0;
}
/* Total number of replicas (number of cores serving an index to the collection) shown by the cluster state */
- protected int getTotalReplicas(String collection) {
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
- DocCollection coll = zkStateReader.getClusterState().getCollectionOrNull(collection);
- if (coll == null) return 0; // support for when collection hasn't been created yet
+ protected int getTotalReplicas(DocCollection c, String collection) {
+ if (c == null) return 0; // support for when collection hasn't been created yet
int cnt = 0;
- for (Slice slices : coll.getSlices()) {
+ for (Slice slices : c.getSlices()) {
cnt += slices.getReplicas().size();
}
return cnt;
}
- public JettySolrRunner createJetty(String dataDir, String ulogDir, String shardList,
- String solrConfigOverride) throws Exception {
-
- JettyConfig jettyconfig = JettyConfig.builder()
- .setContext(context)
- .stopAtShutdown(false)
- .withServlets(getExtraServlets())
- .withFilters(getExtraRequestFilters())
- .withSSLConfig(sslConfig)
- .build();
-
- Properties props = new Properties();
- props.setProperty("solr.data.dir", getDataDir(dataDir));
- props.setProperty("shards", shardList);
- props.setProperty("solr.ulog.dir", ulogDir);
- props.setProperty("solrconfig", solrConfigOverride);
-
- JettySolrRunner jetty = new JettySolrRunner(getSolrHome(), props, jettyconfig);
-
- jetty.start();
-
- return jetty;
- }
-
public final JettySolrRunner createJetty(File solrHome, String dataDir, String shardList, String solrConfigOverride, String schemaOverride) throws Exception {
return createJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, null);
}
@@ -560,7 +642,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
props.setProperty("coreRootDirectory", solrHome.toPath().resolve("cores").toAbsolutePath().toString());
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig);
- jetty.start();
return jetty;
}
@@ -598,13 +679,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
props.setProperty("coreRootDirectory", solrHome.toPath().resolve("cores").toAbsolutePath().toString());
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig);
+ JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig, true);
- SocketProxy proxy = new SocketProxy(0, sslConfig != null && sslConfig.isSSLMode());
- jetty.setProxyPort(proxy.getListenPort());
- jetty.start();
- proxy.open(jetty.getBaseUrl().toURI());
- proxies.put(proxy.getUrl(), proxy);
return jetty;
}
@@ -640,15 +716,20 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
- URL baseUrl = new URL(replicaBaseUrl);
- SocketProxy proxy = proxies.get(baseUrl.toURI());
- if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
- baseUrl = new URL(baseUrl.toExternalForm() + "/");
- proxy = proxies.get(baseUrl.toURI());
+ List<JettySolrRunner> runners = new ArrayList<>(jettys);
+ runners.add(controlJetty);
+
+ for (JettySolrRunner j : runners) {
+ if (replicaBaseUrl.replaceAll("/$", "").equals(j.getProxyBaseUrl().toExternalForm().replaceAll("/$", ""))) {
+ return j.getProxy();
+ }
}
- assertNotNull("No proxy found for " + baseUrl + "!", proxy);
- return proxy;
+
+ printLayout();
+
+ fail("No proxy found for " + replicaBaseUrl + "!");
+ return null;
}
private File getRelativeSolrHomePath(File solrHome) {
@@ -1555,34 +1636,52 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
@Override
public void distribTearDown() throws Exception {
- if (VERBOSE || printLayoutOnTearDown) {
- super.printLayout();
- }
- closeRestTestHarnesses(); // TODO: close here or later?
- if (commonCloudSolrClient != null) {
- commonCloudSolrClient.close();
- }
- if (controlClient != null) {
- controlClient.close();
- }
- if (cloudClient != null) {
- cloudClient.close();
- }
- if (controlClientCloud != null) {
- controlClientCloud.close();
- }
- super.distribTearDown();
+ try {
+ if (VERBOSE || printLayoutOnTearDown) {
+ super.printLayout();
+ }
+
+ closeRestTestHarnesses(); // TODO: close here or later?
+
- System.clearProperty("zkHost");
- System.clearProperty("numShards");
+ } finally {
+ super.distribTearDown();
- // close socket proxies after super.distribTearDown
- if (!proxies.isEmpty()) {
- for (SocketProxy proxy : proxies.values()) {
- proxy.close();
- }
+ System.clearProperty("zkHost");
+ System.clearProperty("numShards");
}
}
+
+ @Override
+ protected void destroyServers() throws Exception {
+ ForkJoinPool customThreadPool = new ForkJoinPool(6);
+
+ customThreadPool.submit(() -> Collections.singleton(commonCloudSolrClient).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ customThreadPool.submit(() -> Collections.singleton(controlClient).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ customThreadPool.submit(() -> coreClients.parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ customThreadPool.submit(() -> Collections.singletonList(controlClientCloud).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ customThreadPool.submit(() -> Collections.singletonList(cloudClient).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
+
+ coreClients.clear();
+
+ super.destroyServers();
+ }
@Override
protected void commit() throws Exception {
@@ -1590,33 +1689,16 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
cloudClient.commit();
}
- @Override
- protected void destroyServers() throws Exception {
- if (controlJetty != null) {
- ChaosMonkey.stop(controlJetty);
- }
- for (JettySolrRunner jetty : jettys) {
- try {
- ChaosMonkey.stop(jetty);
- } catch (Exception e) {
- log.error("", e);
- }
- }
- for (SolrClient client : coreClients) client.close();
- coreClients.clear();
- super.destroyServers();
- }
-
- protected CollectionAdminResponse createCollection(String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode) throws SolrServerException, IOException {
+ protected CollectionAdminResponse createCollection(String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode) throws SolrServerException, IOException, InterruptedException, TimeoutException {
return createCollection(null, collectionName, configSetName, numShards, replicationFactor, maxShardsPerNode, null, null);
}
- protected CollectionAdminResponse createCollection(Map<String,List<Integer>> collectionInfos, String collectionName, Map<String,Object> collectionProps, SolrClient client) throws SolrServerException, IOException{
+ protected CollectionAdminResponse createCollection(Map<String,List<Integer>> collectionInfos, String collectionName, Map<String,Object> collectionProps, SolrClient client) throws SolrServerException, IOException, InterruptedException, TimeoutException{
return createCollection(collectionInfos, collectionName, collectionProps, client, "conf1");
}
// TODO: Use CollectionAdminRequest#createCollection() instead of a raw request
- protected CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName, Map<String, Object> collectionProps, SolrClient client, String confSetName) throws SolrServerException, IOException{
+ protected CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos, String collectionName, Map<String, Object> collectionProps, SolrClient client, String confSetName) throws SolrServerException, IOException, InterruptedException, TimeoutException{
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
@@ -1675,12 +1757,19 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
} else {
res.setResponse(client.request(request));
}
+
+ try {
+ cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(numShards,
+ numShards * (numNrtReplicas + numTlogReplicas + numPullReplicas)));
+ } catch (TimeoutException e) {
+ new RuntimeException("Timeout waiting for " + numShards + " shards and " + (numNrtReplicas + numTlogReplicas + numPullReplicas) + " replicas.", e);
+ }
return res;
}
protected CollectionAdminResponse createCollection(Map<String,List<Integer>> collectionInfos,
- String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr) throws SolrServerException, IOException {
+ String collectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr) throws SolrServerException, IOException, InterruptedException, TimeoutException {
int numNrtReplicas = useTlogReplicas()?0:replicationFactor;
int numTlogReplicas = useTlogReplicas()?replicationFactor:0;
@@ -1696,7 +1785,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
protected CollectionAdminResponse createCollection(Map<String, List<Integer>> collectionInfos,
- String collectionName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr, String configName) throws SolrServerException, IOException {
+ String collectionName, int numShards, int replicationFactor, int maxShardsPerNode, SolrClient client, String createNodeSetStr, String configName) throws SolrServerException, IOException, InterruptedException, TimeoutException {
int numNrtReplicas = useTlogReplicas()?0:replicationFactor;
int numTlogReplicas = useTlogReplicas()?replicationFactor:0;
@@ -1912,7 +2001,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
protected void createCollectionRetry(String testCollectionName, String configSetName, int numShards, int replicationFactor, int maxShardsPerNode)
- throws SolrServerException, IOException {
+ throws SolrServerException, IOException, InterruptedException, TimeoutException {
CollectionAdminResponse resp = createCollection(testCollectionName, configSetName, numShards, replicationFactor, maxShardsPerNode);
if (resp.getResponse().get("failure") != null) {
CollectionAdminRequest.Delete req = CollectionAdminRequest.deleteCollection(testCollectionName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb652b84/solr/test-framework/src/java/org/apache/solr/cloud/AbstractZkTestCase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractZkTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractZkTestCase.java
index 7461c4c..47ef259 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractZkTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractZkTestCase.java
@@ -16,23 +16,15 @@
*/
package org.apache.solr.cloud;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+
import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Base test class for ZooKeeper tests.
*/
@@ -43,21 +35,20 @@ public abstract class AbstractZkTestCase extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
public static File SOLRHOME;
static {
try {
- SOLRHOME = new File(TEST_HOME());
+ SOLRHOME = new File(SolrTestCaseJ4.TEST_HOME());
} catch (RuntimeException e) {
log.warn("TEST_HOME() does not exist - solrj test?");
// solrj tests not working with TEST_HOME()
// must override getSolrHome
}
}
-
- protected static ZkTestServer zkServer;
- protected static String zkDir;
+ protected volatile static ZkTestServer zkServer;
+
+ protected volatile static String zkDir;
@BeforeClass
@@ -71,71 +62,13 @@ public abstract class AbstractZkTestCase extends SolrTestCaseJ4 {
System.setProperty("jetty.port", "0000");
System.setProperty(ZOOKEEPER_FORCE_SYNC, "false");
- buildZooKeeper(zkServer.getZkHost(), zkServer.getZkAddress(), SOLRHOME,
+ zkServer.buildZooKeeper(SOLRHOME,
"solrconfig.xml", "schema.xml");
initCore("solrconfig.xml", "schema.xml");
}
- static void buildZooKeeper(String zkHost, String zkAddress, String config,
- String schema) throws Exception {
- buildZooKeeper(zkHost, zkAddress, SOLRHOME, config, schema);
- }
-
- // static to share with distrib test
- public static void buildZooKeeper(String zkHost, String zkAddress, File solrhome, String config,
- String schema) throws Exception {
- SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT, null);
- zkClient.makePath("/solr", false, true);
- zkClient.close();
-
- zkClient = new SolrZkClient(zkAddress, AbstractZkTestCase.TIMEOUT);
-
- Map<String,Object> props = new HashMap<>();
- props.put("configName", "conf1");
- final ZkNodeProps zkProps = new ZkNodeProps(props);
-
- zkClient.makePath("/collections/collection1", Utils.toJSON(zkProps), CreateMode.PERSISTENT, true);
- zkClient.makePath("/collections/collection1/shards", CreateMode.PERSISTENT, true);
- zkClient.makePath("/collections/control_collection", Utils.toJSON(zkProps), CreateMode.PERSISTENT, true);
- zkClient.makePath("/collections/control_collection/shards", CreateMode.PERSISTENT, true);
- // this workaround is acceptable until we remove legacyCloud because we just init a single core here
- String defaultClusterProps = "{\""+ZkStateReader.LEGACY_CLOUD+"\":\"true\"}";
- zkClient.makePath(ZkStateReader.CLUSTER_PROPS, defaultClusterProps.getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true);
- // for now, always upload the config and schema to the canonical names
- putConfig("conf1", zkClient, solrhome, config, "solrconfig.xml");
- putConfig("conf1", zkClient, solrhome, schema, "schema.xml");
-
- putConfig("conf1", zkClient, solrhome, "solrconfig.snippet.randomindexconfig.xml");
- putConfig("conf1", zkClient, solrhome, "stopwords.txt");
- putConfig("conf1", zkClient, solrhome, "protwords.txt");
- putConfig("conf1", zkClient, solrhome, "currency.xml");
- putConfig("conf1", zkClient, solrhome, "enumsConfig.xml");
- putConfig("conf1", zkClient, solrhome, "open-exchange-rates.json");
- putConfig("conf1", zkClient, solrhome, "mapping-ISOLatin1Accent.txt");
- putConfig("conf1", zkClient, solrhome, "old_synonyms.txt");
- putConfig("conf1", zkClient, solrhome, "synonyms.txt");
- zkClient.close();
- }
- public static void putConfig(String confName, SolrZkClient zkClient, File solrhome, final String name)
- throws Exception {
- putConfig(confName, zkClient, solrhome, name, name);
- }
-
- public static void putConfig(String confName, SolrZkClient zkClient, File solrhome, final String srcName, String destName)
- throws Exception {
- File file = new File(solrhome, "collection1"
- + File.separator + "conf" + File.separator + srcName);
- if (!file.exists()) {
- log.info("skipping " + file.getAbsolutePath() + " because it doesn't exist");
- return;
- }
-
- String destPath = "/configs/" + confName + "/" + destName;
- log.info("put " + file.getAbsolutePath() + " to " + destPath);
- zkClient.makePath(destPath, file, false, true);
- }
@Override
public void tearDown() throws Exception {
@@ -144,43 +77,27 @@ public abstract class AbstractZkTestCase extends SolrTestCaseJ4 {
@AfterClass
public static void azt_afterClass() throws Exception {
- deleteCore();
-
- System.clearProperty("zkHost");
- System.clearProperty("solr.test.sys.prop1");
- System.clearProperty("solr.test.sys.prop2");
- System.clearProperty("solrcloud.skip.autorecovery");
- System.clearProperty("jetty.port");
- System.clearProperty(ZOOKEEPER_FORCE_SYNC);
-
- if (zkServer != null) {
- zkServer.shutdown();
- zkServer = null;
- }
- zkDir = null;
- }
- protected void printLayout(String zkHost) throws Exception {
- SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
- zkClient.printLayoutToStdOut();
- zkClient.close();
+ try {
+ deleteCore();
+ } finally {
+
+ System.clearProperty("zkHost");
+ System.clearProperty("solr.test.sys.prop1");
+ System.clearProperty("solr.test.sys.prop2");
+ System.clearProperty("solrcloud.skip.autorecovery");
+ System.clearProperty("jetty.port");
+ System.clearProperty(ZOOKEEPER_FORCE_SYNC);
+
+ if (zkServer != null) {
+ zkServer.shutdown();
+ zkServer = null;
+ }
+ zkDir = null;
+ }
}
- public static void makeSolrZkNode(String zkHost) throws Exception {
- SolrZkClient zkClient = new SolrZkClient(zkHost, TIMEOUT);
- zkClient.makePath("/solr", false, true);
- zkClient.close();
- }
-
- public static void tryCleanSolrZkNode(String zkHost) throws Exception {
- tryCleanPath(zkHost, "/solr");
- }
-
- static void tryCleanPath(String zkHost, String path) throws Exception {
- SolrZkClient zkClient = new SolrZkClient(zkHost, TIMEOUT);
- if (zkClient.exists(path, true)) {
- zkClient.clean(path);
- }
- zkClient.close();
+ protected void printLayout() throws Exception {
+ zkServer.printLayout();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb652b84/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
index 71e1b43..e2bb5db 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
@@ -18,7 +18,6 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -42,7 +41,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.TimeOut;
@@ -180,81 +178,10 @@ public class ChaosMonkey {
}
public void stopJetty(CloudJettyRunner cjetty) throws Exception {
- stop(cjetty.jetty);
+ cjetty.jetty.stop();
stops.incrementAndGet();
}
- public void killJetty(CloudJettyRunner cjetty) throws Exception {
- kill(cjetty);
- stops.incrementAndGet();
- }
-
- public void stopJetty(JettySolrRunner jetty) throws Exception {
- stops.incrementAndGet();
- stopJettySolrRunner(jetty);
- }
-
- private static void stopJettySolrRunner(JettySolrRunner jetty) throws Exception {
- assert(jetty != null);
- monkeyLog("stop jetty! " + jetty.getLocalPort());
- SolrDispatchFilter sdf = jetty.getSolrDispatchFilter();
- if (sdf != null) {
- try {
- sdf.destroy();
- } catch (Throwable t) {
- log.error("", t);
- }
- }
- try {
- jetty.stop();
- } catch (InterruptedException e) {
- log.info("Jetty stop interrupted - should be a test caused interruption, we will try again to be sure we shutdown");
- }
-
- if (!jetty.isStopped()) {
- jetty.stop();
- }
-
- if (!jetty.isStopped()) {
- throw new RuntimeException("could not stop jetty");
- }
- }
-
-
- public static void kill(List<JettySolrRunner> jettys) throws Exception {
- for (JettySolrRunner jetty : jettys) {
- kill(jetty);
- }
- }
-
- public static void kill(JettySolrRunner jetty) throws Exception {
-
- CoreContainer cores = jetty.getCoreContainer();
- if (cores != null) {
- if (cores.isZooKeeperAware()) {
- int zklocalport = ((InetSocketAddress) cores.getZkController()
- .getZkClient().getSolrZooKeeper().getSocketAddress()).getPort();
- IpTables.blockPort(zklocalport);
- }
- }
-
- IpTables.blockPort(jetty.getLocalPort());
-
- monkeyLog("kill jetty! " + jetty.getLocalPort());
-
- jetty.stop();
-
- stop(jetty);
-
- if (!jetty.isStopped()) {
- throw new RuntimeException("could not kill jetty");
- }
- }
-
- public static void kill(CloudJettyRunner cjetty) throws Exception {
- kill(cjetty.jetty);
- }
-
public void stopAll(int pauseBetweenMs) throws Exception {
Set<String> keys = shardToJetty.keySet();
List<Thread> jettyThreads = new ArrayList<>(keys.size());
@@ -286,7 +213,7 @@ public class ChaosMonkey {
for (String key : keys) {
List<CloudJettyRunner> jetties = shardToJetty.get(key);
for (CloudJettyRunner jetty : jetties) {
- start(jetty.jetty);
+ jetty.jetty.start();
}
}
}
@@ -346,7 +273,7 @@ public class ChaosMonkey {
public CloudJettyRunner killRandomShard(String slice) throws Exception {
CloudJettyRunner cjetty = getRandomJetty(slice, aggressivelyKillLeaders);
if (cjetty != null) {
- killJetty(cjetty);
+ stopJetty(cjetty);
}
return cjetty;
}
@@ -365,12 +292,7 @@ public class ChaosMonkey {
}
// let's check the deadpool count
- int numRunning = 0;
- for (CloudJettyRunner cjetty : shardToJetty.get(slice)) {
- if (!deadPool.contains(cjetty)) {
- numRunning++;
- }
- }
+ int numRunning = getNumRunning(slice);
if (numRunning < 2) {
// we cannot kill anyone
@@ -378,6 +300,27 @@ public class ChaosMonkey {
return null;
}
+ if (numActive == 2) {
+ // we are careful
+ Thread.sleep(1000);
+
+ numActive = checkIfKillIsLegal(slice, numActive);
+
+ if (numActive < 2) {
+ // we cannot kill anyone
+ monkeyLog("only one active node in shard - monkey cannot kill :(");
+ return null;
+ }
+
+ numRunning = getNumRunning(slice);
+
+ if (numRunning < 2) {
+ // we cannot kill anyone
+ monkeyLog("only one active node in shard - monkey cannot kill :(");
+ return null;
+ }
+ }
+
boolean canKillIndexer = canKillIndexer(slice);
if (!canKillIndexer) {
@@ -445,6 +388,16 @@ public class ChaosMonkey {
return cjetty;
}
+ private int getNumRunning(String slice) {
+ int numRunning = 0;
+ for (CloudJettyRunner cjetty : shardToJetty.get(slice)) {
+ if (!deadPool.contains(cjetty)) {
+ numRunning++;
+ }
+ }
+ return numRunning;
+ }
+
private Type getTypeForJetty(String sliceName, CloudJettyRunner cjetty) {
DocCollection docCollection = zkStateReader.getClusterState().getCollection(collection);
@@ -594,7 +547,8 @@ public class ChaosMonkey {
if (!deadPool.isEmpty()) {
int index = chaosRandom.nextInt(deadPool.size());
JettySolrRunner jetty = deadPool.get(index).jetty;
- if (jetty.isStopped() && !ChaosMonkey.start(jetty)) {
+ if (jetty.isStopped()) {
+ jetty.start();
return;
}
deadPool.remove(index);
@@ -632,59 +586,14 @@ public class ChaosMonkey {
public static void stop(List<JettySolrRunner> jettys) throws Exception {
for (JettySolrRunner jetty : jettys) {
- stop(jetty);
+ jetty.stop();
}
}
- public static void stop(JettySolrRunner jetty) throws Exception {
- stopJettySolrRunner(jetty);
- }
-
public static void start(List<JettySolrRunner> jettys) throws Exception {
for (JettySolrRunner jetty : jettys) {
- start(jetty);
- }
- }
-
- public static boolean start(JettySolrRunner jetty) throws Exception {
- monkeyLog("starting jetty! " + jetty.getLocalPort());
- IpTables.unblockPort(jetty.getLocalPort());
- try {
jetty.start();
- } catch (Exception e) {
- jetty.stop();
- Thread.sleep(3000);
- try {
- jetty.start();
- } catch (Exception e2) {
- jetty.stop();
- Thread.sleep(10000);
- try {
- jetty.start();
- } catch (Exception e3) {
- jetty.stop();
- Thread.sleep(30000);
- try {
- jetty.start();
- } catch (Exception e4) {
- log.error("Could not get the port to start jetty again", e4);
- // we coud not get the port
- jetty.stop();
- return false;
- }
- }
- }
}
- CoreContainer cores = jetty.getCoreContainer();
- if (cores != null) {
- if (cores.isZooKeeperAware()) {
- int zklocalport = ((InetSocketAddress) cores.getZkController()
- .getZkClient().getSolrZooKeeper().getSocketAddress()).getPort();
- IpTables.unblockPort(zklocalport);
- }
- }
-
- return true;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/eb652b84/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index f49870f..9b52b80 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -24,35 +24,52 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.embedded.SSLConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
@@ -98,7 +115,7 @@ public class MiniSolrCloudCluster {
" \n" +
"</solr>\n";
- private ZkTestServer zkServer; // non-final due to injectChaos()
+ private volatile ZkTestServer zkServer; // non-final due to injectChaos()
private final boolean externalZkServer;
private final List<JettySolrRunner> jettys = new CopyOnWriteArrayList<>();
private final Path baseDir;
@@ -226,7 +243,14 @@ public class MiniSolrCloudCluster {
if (!externalZkServer) {
String zkDir = baseDir.resolve("zookeeper/server1/data").toString();
zkTestServer = new ZkTestServer(zkDir);
- zkTestServer.run();
+ try {
+ zkTestServer.run();
+ } catch (Exception e) {
+ log.error("Error starting Zk Test Server, trying again ...");
+ zkTestServer.shutdown();
+ zkTestServer = new ZkTestServer(zkDir);
+ zkTestServer.run();
+ }
}
this.zkServer = zkTestServer;
@@ -260,46 +284,73 @@ public class MiniSolrCloudCluster {
throw startupError;
}
- waitForAllNodes(numServers, 60);
-
solrClient = buildSolrClient();
+
+ if (numServers > 0) {
+ waitForAllNodes(numServers, 60);
+ }
+
}
- private void waitForAllNodes(int numServers, int timeout) throws IOException, InterruptedException {
- try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
- int numliveNodes = 0;
- int retries = timeout;
- String liveNodesPath = "/solr/live_nodes";
- // Wait up to {timeout} seconds for number of live_nodes to match up number of servers
- do {
- if (zkClient.exists(liveNodesPath, true)) {
- numliveNodes = zkClient.getChildren(liveNodesPath, null, true).size();
- if (numliveNodes == numServers) {
- break;
- }
- }
- retries--;
- if (retries == 0) {
- throw new IllegalStateException("Solr servers failed to register with ZK."
- + " Current count: " + numliveNodes + "; Expected count: " + numServers);
+ private void waitForAllNodes(int numServers, int timeoutSeconds) throws IOException, InterruptedException, TimeoutException {
+
+ executorLauncher.shutdown();
+
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+
+ int numRunning = 0;
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+
+ while (true) {
+ if (timeout.hasTimedOut()) {
+ throw new IllegalStateException("giving up waiting for all jetty instances to be running. numServers=" + numServers
+ + " numRunning=" + numRunning);
+ }
+ numRunning = 0;
+ for (JettySolrRunner jetty : getJettySolrRunners()) {
+ if (jetty.isRunning()) {
+ numRunning++;
}
-
- Thread.sleep(1000);
- } while (numliveNodes != numServers);
+ }
+ if (numServers == numRunning) {
+ break;
+ }
+ Thread.sleep(100);
}
- catch (KeeperException e) {
- throw new IOException("Error communicating with zookeeper", e);
+
+ ZkStateReader reader = getSolrClient().getZkStateReader();
+ for (JettySolrRunner jetty : getJettySolrRunners()) {
+ reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
}
}
+ public void waitForNode(JettySolrRunner jetty, int timeoutSeconds)
+ throws IOException, InterruptedException, TimeoutException {
+
+ executorLauncher.shutdown();
+
+ ExecutorUtil.shutdownAndAwaitTermination(executorLauncher);
+
+ ZkStateReader reader = getSolrClient().getZkStateReader();
+
+ reader.waitForLiveNodes(30, TimeUnit.SECONDS, (o, n) -> n.contains(jetty.getNodeName()));
+
+ }
+
/**
- * Wait for all Solr nodes to be live
+ * This method wait till all Solr JVMs ( Jettys ) are running . It waits up to the timeout (in seconds) for the JVMs to
+ * be up before throwing IllegalStateException. This is called automatically on cluster startup and so is only needed
+ * when starting additional Jetty instances.
*
- * @param timeout number of seconds to wait before throwing an IllegalStateException
- * @throws IOException if there was an error communicating with ZooKeeper
- * @throws InterruptedException if the calling thread is interrupted during the wait operation
+ * @param timeout
+ * number of seconds to wait before throwing an IllegalStateException
+ * @throws IOException
+ * if there was an error communicating with ZooKeeper
+ * @throws InterruptedException
+ * if the calling thread is interrupted during the wait operation
+ * @throws TimeoutException on timeout before all nodes being ready
*/
- public void waitForAllNodes(int timeout) throws IOException, InterruptedException {
+ public void waitForAllNodes(int timeout) throws IOException, InterruptedException, TimeoutException {
waitForAllNodes(jettys.size(), timeout);
}
@@ -455,11 +506,67 @@ public class MiniSolrCloudCluster {
/** Delete all collections (and aliases) */
public void deleteAllCollections() throws Exception {
try (ZkStateReader reader = new ZkStateReader(solrClient.getZkStateReader().getZkClient())) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ reader.registerCloudCollectionsListener(new CloudCollectionsListener() {
+
+ @Override
+ public void onChange(Set<String> oldCollections, Set<String> newCollections) {
+ if (newCollections != null && newCollections.size() == 0) {
+ latch.countDown();
+ }
+ }
+ });
+
reader.createClusterStateWatchersAndUpdate(); // up to date aliases & collections
reader.aliasesManager.applyModificationAndExportToZk(aliases -> Aliases.EMPTY);
for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
CollectionAdminRequest.deleteCollection(collection).process(solrClient);
}
+
+ boolean success = latch.await(60, TimeUnit.SECONDS);
+ if (!success) {
+ throw new IllegalStateException("Still waiting to see all collections removed from clusterstate.");
+ }
+
+ for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
+ reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false);
+ }
+
+ }
+
+ // may be deleted, but may not be gone yet - we only wait to not see it in ZK, not for core unloads
+ TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (true) {
+
+ if( timeout.hasTimedOut() ) {
+ throw new TimeoutException("Timed out waiting for all collections to be fully removed.");
+ }
+
+ boolean allContainersEmpty = true;
+ for(JettySolrRunner jetty : jettys) {
+ CoreContainer cc = jetty.getCoreContainer();
+ if (cc != null && cc.getCores().size() != 0) {
+ allContainersEmpty = false;
+ }
+ }
+ if (allContainersEmpty) {
+ break;
+ }
+ }
+
+ }
+
+ public void deleteAllConfigSets() throws SolrServerException, IOException {
+
+ List<String> configSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
+
+ for (String configSet : configSetNames) {
+ if (configSet.equals("_default")) {
+ continue;
+ }
+ new ConfigSetAdminRequest.Delete()
+ .setConfigSetName(configSet)
+ .process(solrClient);
}
}
@@ -509,7 +616,7 @@ public class MiniSolrCloudCluster {
protected CloudSolrClient buildSolrClient() {
return new Builder(Collections.singletonList(getZkServer().getZkAddress()), Optional.empty())
- .build();
+ .withSocketTimeout(90000).withConnectionTimeout(15000).build(); // we choose 90 because we run in some harsh envs
}
private static String getHostContextSuitableForServletContext(String ctx) {
@@ -564,14 +671,14 @@ public class MiniSolrCloudCluster {
}
}
- public void injectChaos(Random random) throws Exception {
+ public synchronized void injectChaos(Random random) throws Exception {
// sometimes we restart one of the jetty nodes
if (random.nextBoolean()) {
JettySolrRunner jetty = jettys.get(random.nextInt(jettys.size()));
- ChaosMonkey.stop(jetty);
+ jetty.stop();
log.info("============ Restarting jetty");
- ChaosMonkey.start(jetty);
+ jetty.start();
}
// sometimes we restart zookeeper
@@ -579,7 +686,7 @@ public class MiniSolrCloudCluster {
zkServer.shutdown();
log.info("============ Restarting zookeeper");
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
- zkServer.run();
+ zkServer.run(false);
}
// sometimes we cause a connection loss - sometimes it will hit the overseer
@@ -588,4 +695,91 @@ public class MiniSolrCloudCluster {
ChaosMonkey.causeConnectionLoss(jetty);
}
}
+
+ public Overseer getOpenOverseer() {
+ List<Overseer> overseers = new ArrayList<>();
+ for (int i = 0; i < jettys.size(); i++) {
+ JettySolrRunner runner = getJettySolrRunner(i);
+ if (runner.getCoreContainer() != null) {
+ overseers.add(runner.getCoreContainer().getZkController().getOverseer());
+ }
+ }
+
+ return getOpenOverseer(overseers);
+ }
+
+ public static Overseer getOpenOverseer(List<Overseer> overseers) {
+ ArrayList<Overseer> shuffledOverseers = new ArrayList<Overseer>(overseers);
+ Collections.shuffle(shuffledOverseers, LuceneTestCase.random());
+ for (Overseer overseer : shuffledOverseers) {
+ if (!overseer.isClosed()) {
+ return overseer;
+ }
+ }
+ throw new SolrException(ErrorCode.NOT_FOUND, "No open Overseer found");
+ }
+
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+ CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas);
+
+ AtomicReference<DocCollection> state = new AtomicReference<>();
+ AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
+ try {
+ getSolrClient().waitForState(collection, wait, unit, (n, c) -> {
+ state.set(c);
+ liveNodesLastSeen.set(n);
+
+ return predicate.matches(n, c);
+ });
+ } catch (TimeoutException | InterruptedException e) {
+ throw new RuntimeException("Failed while waiting for active collection" + "\n" + e.getMessage() + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray())
+ + "\nLast available state: " + state.get());
+ }
+
+ }
+
+ public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
+ waitForActiveCollection(collection, 30, TimeUnit.SECONDS, shards, totalReplicas);
+ }
+
+ public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas) {
+ return (liveNodes, collectionState) -> {
+ if (collectionState == null)
+ return false;
+ if (collectionState.getSlices().size() != expectedShards) {
+ return false;
+ }
+
+ int activeReplicas = 0;
+ for (Slice slice : collectionState) {
+ for (Replica replica : slice) {
+ if (replica.isActive(liveNodes)) {
+ activeReplicas++;
+ }
+ }
+ }
+ if (activeReplicas == expectedReplicas) {
+ return true;
+ }
+
+ return false;
+ };
+ }
+
+ public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
+ TimeOut timeout = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while(!timeout.hasTimedOut()) {
+ if (runner.isStopped()) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ if (timeout.hasTimedOut()) {
+ throw new TimeoutException("Waiting for Jetty to stop timed out");
+ }
+ }
}