You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/20 16:46:20 UTC
[lucene-solr] branch reference_impl updated: @1469 Cleanup the
cleanup.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new dcb9b91 @1469 Cleanup the cleanup.
dcb9b91 is described below
commit dcb9b9154fd414dc7be17b28de4cb143fe645d16
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Mar 20 11:45:30 2021 -0500
@1469 Cleanup the cleanup.
Took 15 hours 9 minutes
---
.../lucene/index/ConcurrentMergeScheduler.java | 4 +-
.../client/solrj/embedded/JettySolrRunner.java | 19 +-
.../java/org/apache/solr/cloud/LeaderElector.java | 10 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 64 ++--
.../apache/solr/cloud/OverseerElectionContext.java | 2 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 51 ++--
.../java/org/apache/solr/cloud/StatePublisher.java | 109 ++++---
.../java/org/apache/solr/cloud/ZkController.java | 103 +++----
.../solr/cloud/overseer/CollectionMutator.java | 3 +-
.../apache/solr/cloud/overseer/OverseerAction.java | 2 -
.../apache/solr/cloud/overseer/ZkStateWriter.java | 64 ++--
.../apache/solr/core/CachingDirectoryFactory.java | 4 +-
.../java/org/apache/solr/core/CoreContainer.java | 94 +++---
.../src/java/org/apache/solr/core/SolrCore.java | 49 ++-
.../src/java/org/apache/solr/core/SolrCores.java | 20 +-
.../src/java/org/apache/solr/core/ZkContainer.java | 30 --
.../java/org/apache/solr/handler/IndexFetcher.java | 14 +-
.../apache/solr/handler/RequestHandlerBase.java | 10 +-
.../org/apache/solr/handler/admin/ColStatus.java | 5 +-
.../solr/handler/admin/CoreAdminOperation.java | 3 +
.../apache/solr/handler/admin/PrepRecoveryOp.java | 14 +-
.../handler/component/RealTimeGetComponent.java | 3 +-
.../java/org/apache/solr/servlet/HttpSolrCall.java | 110 +++----
.../apache/solr/update/DefaultSolrCoreState.java | 48 ++-
.../apache/solr/update/DirectUpdateHandler2.java | 2 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 19 +-
.../java/org/apache/solr/update/SolrCoreState.java | 17 +-
.../processor/DistributedZkUpdateProcessor.java | 66 ++++-
.../java/org/apache/solr/util/TestInjection.java | 2 +-
.../cloud/AssignBackwardCompatibilityTest.java | 3 +-
.../apache/solr/cloud/BasicDistributedZk2Test.java | 4 +-
.../solr/cloud/ChaosMonkeySafeLeaderTest.java | 7 +
.../solr/cloud/CollectionStateZnodeTest.java | 20 +-
.../apache/solr/cloud/CollectionsAPISolrJTest.java | 1 -
.../org/apache/solr/cloud/ConfigSetsAPITest.java | 15 +-
.../solr/cloud/DeleteInactiveReplicaTest.java | 2 +-
.../test/org/apache/solr/cloud/DeleteNodeTest.java | 6 +-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 96 +++---
.../org/apache/solr/cloud/DeleteShardTest.java | 4 +-
.../org/apache/solr/cloud/DeleteStatusTest.java | 2 -
.../solr/cloud/HttpPartitionOnCommitTest.java | 6 +-
.../solr/cloud/LeaderElectionContextKeyTest.java | 16 +-
.../org/apache/solr/cloud/LeaderElectionTest.java | 4 +-
.../org/apache/solr/cloud/MigrateRouteKeyTest.java | 4 +-
.../apache/solr/cloud/PeerSyncReplicationTest.java | 1 -
.../solr/cloud/RecoveryAfterSoftCommitTest.java | 14 +-
.../apache/solr/cloud/ReplicationFactorTest.java | 4 +-
.../apache/solr/cloud/SolrCloudBridgeTestCase.java | 67 +++--
.../test/org/apache/solr/cloud/SplitShardTest.java | 2 +-
.../test/org/apache/solr/cloud/SyncSliceTest.java | 13 +-
.../org/apache/solr/cloud/TestCloudRecovery2.java | 127 ++++++--
.../solr/cloud/TestDistribDocBasedVersion.java | 4 +-
.../solr/cloud/TestDownShardTolerantSearch.java | 33 ++-
.../org/apache/solr/cloud/TestHashPartitioner.java | 2 +-
.../org/apache/solr/cloud/TestPullReplica.java | 30 +-
.../CollectionsAPIDistClusterPerZkTest.java | 41 ++-
.../test/org/apache/solr/core/CoreSorterTest.java | 11 +-
.../solr/core/ExitableDirectoryReaderTest.java | 3 -
.../solr/core/SolrCoreCheckLockOnStartupTest.java | 2 -
.../test/org/apache/solr/core/TestBadConfig.java | 2 -
.../org/apache/solr/core/TestCodecSupport.java | 19 +-
.../org/apache/solr/core/TestCustomStream.java | 14 +-
.../org/apache/solr/core/TestJmxIntegration.java | 2 +-
.../apache/solr/core/TestSolrConfigHandler.java | 1 +
.../component/ResourceSharingTestComponent.java | 9 +-
.../handler/component/ShardsWhitelistTest.java | 12 +-
.../solr/metrics/SolrMetricsIntegrationTest.java | 9 +
.../security/JWTAuthPluginIntegrationTest.java | 2 +-
.../update/TestInPlaceUpdateWithRouteField.java | 2 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 6 +-
.../solrj/impl/ZkClientClusterStateProvider.java | 10 -
.../org/apache/solr/common/cloud/ClusterState.java | 5 +-
.../apache/solr/common/cloud/DocCollection.java | 31 +-
.../java/org/apache/solr/common/cloud/Replica.java | 7 +-
.../java/org/apache/solr/common/cloud/Slice.java | 2 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 3 +-
.../apache/solr/common/cloud/ZkStateReader.java | 327 ++++++++++++++-------
.../solr/common/util/SolrInternalHttpClient.java | 7 +-
.../solr/common/util/SolrQueuedThreadPool.java | 65 ++--
.../src/java/org/apache/solr/SolrTestCase.java | 3 +-
.../solr/cloud/AbstractDistribZkTestBase.java | 2 +-
.../solr/cloud/AbstractFullDistribZkTestBase.java | 21 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 41 ++-
.../src/resources/logconf/log4j2-startup-debug.xml | 1 +
84 files changed, 1203 insertions(+), 885 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index cece582..cb1c40c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -410,7 +410,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
private synchronized void initDynamicDefaults(Directory directory) throws IOException {
if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) {
- boolean spins = IOUtils.spins(directory);
+ boolean spins = false;
// Let tests override this to help reproducing a failure on a machine that has a different
// core count than the one where the test originally failed:
@@ -418,6 +418,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
String value = System.getProperty(DEFAULT_SPINS_PROPERTY);
if (value != null) {
spins = Boolean.parseBoolean(value);
+ } else {
+ spins = IOUtils.spins(directory);
}
} catch (Exception ignored) {
// that's fine we might hit a SecurityException etc. here just continue
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 4eede2c..fbca201 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -246,27 +246,13 @@ public class JettySolrRunner implements Closeable {
* @param config the configuration
*/
public JettySolrRunner(String solrHome, Properties nodeProperties, JettyConfig config) {
- this(solrHome, nodeProperties, config, false);
- }
-
- /**
- * Construct a JettySolrRunner
- *
- * After construction, you must start the jetty with {@link #start()}
- *
- * @param solrHome the solrHome to use
- * @param nodeProperties the container properties
- * @param config the configuration
- * @param enableProxy enables proxy feature to disable connections
- */
- public JettySolrRunner(String solrHome, Properties nodeProperties, JettyConfig config, boolean enableProxy) {
assert ObjectReleaseTracker.track(this);
SecurityManager s = System.getSecurityManager();
ThreadGroup group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
scheduler = new SolrScheduledExecutorScheduler("jetty-scheduler", null, group);
- this.enableProxy = enableProxy;
+ this.enableProxy = config.enableProxy;
this.solrHome = solrHome;
this.config = config;
this.nodeProperties = nodeProperties;
@@ -705,6 +691,7 @@ public class JettySolrRunner implements Closeable {
server.join();
} catch (InterruptedException e) {
SolrZkClient.checkInterrupted(e);
+ log.error("Interrupted waiting to stop", e);
throw new RuntimeException(e);
}
@@ -734,7 +721,7 @@ public class JettySolrRunner implements Closeable {
} catch (Exception e) {
SolrZkClient.checkInterrupted(e);
- log.error("", e);
+ log.error("Exception stopping jetty", e);
throw new RuntimeException(e);
} finally {
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 942c6d7..8aaf8c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -196,6 +196,10 @@ public class LeaderElector implements Closeable {
} else {
+ if (state == LEADER || state == POT_LEADER) {
+ return false;
+ }
+
String toWatch = seqs.get(0);
for (String node : seqs) {
if (leaderSeqNodeName.equals(node)) {
@@ -213,6 +217,8 @@ public class LeaderElector implements Closeable {
IOUtils.closeQuietly(oldWatcher);
}
+ state = WAITING_IN_ELECTION;
+
watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context);
Stat exists = zkClient.exists(watchedNode, watcher);
if (exists == null) {
@@ -220,7 +226,7 @@ public class LeaderElector implements Closeable {
return true;
}
- state = WAITING_IN_ELECTION;
+
if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath);
return false;
@@ -268,7 +274,7 @@ public class LeaderElector implements Closeable {
// TODO: get this core param out of here
- protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
+ protected synchronized void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
if (state == CLOSED || isClosed) {
throw new AlreadyClosedException();
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 384d486..7cf6633 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -248,7 +248,7 @@ public class Overseer implements SolrCloseable {
this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats, this);
}
- public synchronized void start(String id, ElectionContext context) throws KeeperException {
+ public synchronized void start(String id, ElectionContext context, boolean weAreReplacement) throws KeeperException {
log.info("Starting Overseer");
if (getCoreContainer().isShutDown() || closeAndDone) {
if (log.isDebugEnabled()) log.debug("Already closed, exiting");
@@ -334,8 +334,8 @@ public class Overseer implements SolrCloseable {
queueWatcher = new WorkQueueWatcher(getCoreContainer(), this);
collectionQueueWatcher = new CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
try {
- queueWatcher.start();
- collectionQueueWatcher.start();
+ queueWatcher.start(weAreReplacement);
+ collectionQueueWatcher.start(weAreReplacement);
} catch (InterruptedException e) {
log.warn("interrupted", e);
}
@@ -727,7 +727,7 @@ public class Overseer implements SolrCloseable {
this.path = path;
}
- public abstract void start() throws KeeperException, InterruptedException;
+ public abstract void start(boolean weAreReplacement) throws KeeperException, InterruptedException;
private List<String> getItems() {
try {
@@ -765,7 +765,7 @@ public class Overseer implements SolrCloseable {
try {
List<String> items = getItems();
if (items.size() > 0) {
- processQueueItems(items, false);
+ processQueueItems(items, false, false);
}
} catch (AlreadyClosedException e) {
@@ -778,7 +778,7 @@ public class Overseer implements SolrCloseable {
}
- protected abstract void processQueueItems(List<String> items, boolean onStart);
+ protected abstract void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement);
@Override
public void close() {
@@ -803,19 +803,19 @@ public class Overseer implements SolrCloseable {
super(cc, overseer, Overseer.OVERSEER_QUEUE);
}
- public void start() throws KeeperException, InterruptedException {
+ public void start(boolean weAreReplacement) throws KeeperException, InterruptedException {
if (closed) return;
zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
startItems = super.getItems();
log.info("Overseer found entries on start {} {}", startItems, path);
if (startItems.size() > 0) {
- processQueueItems(startItems, true);
+ processQueueItems(startItems, true, weAreReplacement);
}
}
@Override
- protected void processQueueItems(List<String> items, boolean onStart) {
+ protected void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement) {
//if (closed) return;
List<String> fullPaths = new ArrayList<>(items.size());
CountDownLatch delCountDownLatch = null;
@@ -861,29 +861,20 @@ public class Overseer implements SolrCloseable {
stateUpdateMessage.getProperties().remove(StatePublisher.OPERATION);
for (Map.Entry<String,Object> stateUpdateEntry : stateUpdateMessage.getProperties().entrySet()) {
- if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
- if (onStart) {
+ OverseerAction oa = OverseerAction.get(stateUpdateEntry.getKey());
+
+ if (OverseerAction.RECOVERYNODE.equals(oa) || OverseerAction.DOWNNODE.equals(oa)) {
+ if (OverseerAction.DOWNNODE.equals(oa) && onStart && !weAreReplacement) {
continue;
}
- Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
- String collId = Long.toString(docColl.getId());
- ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
- if (updates == null) {
- updates = new ConcurrentHashMap<>( );
- collStateUpdates.put(collId, updates);
- }
- List<Replica> replicas = docColl.getReplicas();
- for (Replica replica : replicas) {
- if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
- if (log.isDebugEnabled()) log.debug("set down node operation {} for replica {}", op, replica);
- ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
- update.id = replica.getId();
- update.state = Replica.State.getShortState(Replica.State.DOWN);
- updates.put(update.id, update);
- }
- }
- });
- } else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
+ Replica.State setState = null;
+ if (OverseerAction.DOWNNODE.equals(oa)) {
+ setState = Replica.State.DOWN;
+ } else if (OverseerAction.RECOVERYNODE.equals(oa)) {
+ setState = Replica.State.RECOVERING;
+ }
+
+ Replica.State finalSetState = setState;
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
@@ -894,10 +885,10 @@ public class Overseer implements SolrCloseable {
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
- if (log.isDebugEnabled()) log.debug("set recovery node operation {} for replica {}", op, replica);
+ if (log.isDebugEnabled()) log.debug("set {} node operation {} for replica {}", finalSetState, op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
- update.state = Replica.State.getShortState(Replica.State.RECOVERING);
+ update.state = Replica.State.getShortState(finalSetState);
updates.put(update.id, update);
}
}
@@ -906,7 +897,8 @@ public class Overseer implements SolrCloseable {
for (Map.Entry<String,Object> stateUpdateEntry2 : stateUpdateMessage.getProperties().entrySet()) {
// if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
- if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey())) || OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey()))) {
+ OverseerAction oa2 = OverseerAction.get(stateUpdateEntry2.getKey());
+ if (OverseerAction.RECOVERYNODE.equals(oa2) || OverseerAction.DOWNNODE.equals(oa2)) {
continue;
}
String id = stateUpdateEntry2.getKey();
@@ -1053,7 +1045,7 @@ public class Overseer implements SolrCloseable {
}
@Override
- public void start() throws KeeperException, InterruptedException {
+ public void start(boolean weAreReplacement) throws KeeperException, InterruptedException {
if (closed) return;
zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
@@ -1062,12 +1054,12 @@ public class Overseer implements SolrCloseable {
log.info("Overseer found entries on start {}", startItems);
if (startItems.size() > 0) {
- processQueueItems(startItems, true);
+ processQueueItems(startItems, true, weAreReplacement);
}
}
@Override
- protected void processQueueItems(List<String> items, boolean onStart) {
+ protected void processQueueItems(List<String> items, boolean onStart, boolean weAreReplacement) {
if (closed) return;
ourLock.lock();
List<String> fullPaths = new ArrayList<>(items.size());
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index a7e3391..302bfee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -95,7 +95,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
if (!overseer.getZkController().getCoreContainer().isShutDown() && !overseer.getZkController().isShutdownCalled()
&& !overseer.isDone()) {
log.info("Starting overseer after winning Overseer election {}", id);
- overseer.start(id, context);
+ overseer.start(id, context, weAreReplacement);
} else {
log.info("Will not start Overseer because we are closed");
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 013e285..0ecda92 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -185,6 +185,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (log.isDebugEnabled()) log.debug("Stopping recovery for core=[{}]", coreName);
+ if (latch != null) {
+ latch.countDown();
+ }
+
try {
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.cancel();
@@ -199,9 +203,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
finalReplicationHandler.abortFetch();
}
- if (latch != null) {
- latch.countDown();
- }
+
//ObjectReleaseTracker.release(this);
}
@@ -336,6 +338,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
} catch (AlreadyClosedException e) {
log.info("AlreadyClosedException, won't do recovery", e);
return;
+ } catch (RejectedExecutionException e) {
+ log.info("RejectedExecutionException, won't do recovery", e);
+ return;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.error("Exception during recovery", e);
@@ -626,6 +631,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (!successfulRecovery && !isClosed() && !core.isClosing() && !core.isClosed()) {
cnt++;
try {
+
+ log.debug("Begin buffering updates. core=[{}]", coreName);
+ // recalling buffer updates will drop the old buffer tlog
+ if (ulog.getState() != UpdateLog.State.BUFFERING) {
+ ulog.bufferUpdates();
+ }
+
+
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
LeaderElector leaderElector = zkController.getLeaderElector(coreName);
@@ -644,10 +657,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
continue;
}
- log.debug("Begin buffering updates. core=[{}]", coreName);
- // recalling buffer updates will drop the old buffer tlog
- ulog.bufferUpdates();
-
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished (see SOLR-7141 for
@@ -708,6 +717,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
didReplication = true;
try {
+ // recalling buffer updates will drop the old buffer tlog
+ if (ulog.getState() != UpdateLog.State.BUFFERING) {
+ ulog.bufferUpdates();
+ }
+
try {
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.cancel();
@@ -716,8 +730,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// okay
}
log.debug("Begin buffering updates. core=[{}]", coreName);
- // recalling buffer updates will drop the old buffer tlog
- ulog.bufferUpdates();
+
sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().
getCollection(core.getCoreDescriptor().getCollectionName()).getSlice(cloudDesc.getShardId()), core.getCoreDescriptor());
@@ -736,8 +749,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Replication Recovery was successful.");
successfulRecovery = true;
- } catch (InterruptedException | AlreadyClosedException e) {
- log.info("Interrupted or already closed, bailing on recovery");
+ } catch (InterruptedException | AlreadyClosedException | RejectedExecutionException e) {
+ log.info("{} bailing on recovery", e.getClass().getSimpleName());
close = true;
successfulRecovery = false;
break;
@@ -772,7 +785,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
publishedActive = true;
close = true;
- } catch (AlreadyClosedException e) {
+ } catch (AlreadyClosedException | RejectedExecutionException e) {
log.error("Already closed");
successfulRecovery = false;
close = true;
@@ -956,8 +969,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Sending prep recovery command to {} for leader={} params={}", leaderBaseUrl, leaderCoreName, prepCmd.getParams());
- int conflictWaitMs = zkController.getLeaderConflictResolveWait();
- int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+ int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "3000"));
+
+ if (isClosed()) {
+ throw new AlreadyClosedException();
+ }
try (Http2SolrClient client = new Http2SolrClient.Builder(leaderBaseUrl).withHttpClient(cc.getUpdateShardHandler().
getRecoveryOnlyClient()).idleTimeout(readTimeout).markInternalRequest().build()) {
@@ -969,6 +985,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
prevSendPreRecoveryHttpUriRequest = result;
try {
+
boolean success = latch.await(readTimeout, TimeUnit.MILLISECONDS);
if (!success) {
//result.cancel();
@@ -1008,12 +1025,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
@Override
public void onFailure(Throwable throwable, int code) {
- log.info("failed sending prep recovery cmd to leader");
+ log.info("failed sending prep recovery cmd to leader response code={}", code, throwable);
- if (throwable.getMessage().contains("Not the valid leader")) {
+ if (throwable != null && throwable.getMessage() != null && throwable.getMessage().contains("Not the valid leader")) {
try {
try {
- Thread.sleep(250);
+ Thread.sleep(10);
cc.getZkController().getZkStateReader().waitForState(RecoveryStrategy.this.collection, 3, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
if (collectionState == null) {
return false;
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 8fe7abb..bafa621 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
@@ -62,8 +63,9 @@ public class StatePublisher implements Closeable {
}
static final String PREFIX = "qn-";
public static final NoOpMessage TERMINATE_OP = new NoOpMessage();
+ public static final ConcurrentHashMap TERMINATE_OP_MAP = new ConcurrentHashMap();
- private final ArrayBlockingQueue<ZkNodeProps> workQueue = new ArrayBlockingQueue<>(1024, true);
+ private final ArrayBlockingQueue<ConcurrentHashMap> workQueue = new ArrayBlockingQueue<>(1024, true);
private final ZkDistributedQueue overseerJobQueue;
private volatile Worker worker;
private volatile Future<?> workerFuture;
@@ -71,6 +73,9 @@ public class StatePublisher implements Closeable {
private volatile boolean terminated;
private class Worker implements Runnable {
+ public static final int POLL_TIME_ON_PUBLISH_NODE = 1;
+ public static final int POLL_TIME = 5;
+
Worker() {
}
@@ -79,20 +84,19 @@ public class StatePublisher implements Closeable {
public void run() {
while (!terminated) {
-// if (!zkStateReader.getZkClient().isConnected()) {
-// try {
-// zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
-// } catch (TimeoutException e) {
-// continue;
-// } catch (InterruptedException e) {
-// log.error("publisher interrupted", e);
-// }
-// continue;
-// }
-
- ZkNodeProps message = null;
- ZkNodeProps bulkMessage = new ZkNodeProps();
- bulkMessage.getProperties().put(OPERATION, "state");
+ if (!zkStateReader.getZkClient().isAlive()) {
+ try {
+ zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
+ } catch (AlreadyClosedException e) {
+ log.warn("Hit already closed exception while waiting for zkclient to reconnect");
+ return;
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ ConcurrentHashMap message = null;
+ ConcurrentHashMap bulkMessage = new ConcurrentHashMap();
+ bulkMessage.put(OPERATION, "state");
int pollTime = 250;
try {
try {
@@ -104,14 +108,15 @@ public class StatePublisher implements Closeable {
if (message != null) {
log.debug("Got state message " + message);
- if (message == TERMINATE_OP) {
+ if (message == TERMINATE_OP_MAP) {
log.debug("State publish is terminated");
terminated = true;
+ pollTime = 1;
} else {
if (bulkMessage(message, bulkMessage)) {
- pollTime = 20;
+ pollTime = POLL_TIME_ON_PUBLISH_NODE;
} else {
- pollTime = 150;
+ pollTime = POLL_TIME;
}
}
@@ -124,13 +129,14 @@ public class StatePublisher implements Closeable {
}
if (message != null) {
if (log.isDebugEnabled()) log.debug("Got state message " + message);
- if (message == TERMINATE_OP) {
+ if (message == TERMINATE_OP_MAP) {
terminated = true;
+ pollTime = 1;
} else {
if (bulkMessage(message, bulkMessage)) {
- pollTime = 10;
+ pollTime = POLL_TIME_ON_PUBLISH_NODE;
} else {
- pollTime = 25;
+ pollTime = POLL_TIME;
}
}
} else {
@@ -139,7 +145,7 @@ public class StatePublisher implements Closeable {
}
}
- if (bulkMessage.getProperties().size() > 1) {
+ if (bulkMessage.size() > 1) {
processMessage(bulkMessage);
} else {
log.debug("No messages to publish, loop");
@@ -155,31 +161,32 @@ public class StatePublisher implements Closeable {
}
}
- private boolean bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
- if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
- String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+ private boolean bulkMessage(ConcurrentHashMap zkNodeProps, ConcurrentHashMap bulkMessage) {
+ if (OverseerAction.get((String) zkNodeProps.get(OPERATION)) == OverseerAction.DOWNNODE) {
+ String nodeName = (String) zkNodeProps.get(ZkStateReader.NODE_NAME_PROP);
//clearStatesForNode(bulkMessage, nodeName);
- bulkMessage.getProperties().put(OverseerAction.DOWNNODE.toLower(), nodeName);
+ bulkMessage.put(OverseerAction.DOWNNODE.toLower(), nodeName);
log.debug("bulk state publish down node, props={} result={}", zkNodeProps, bulkMessage);
-
- } else if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.RECOVERYNODE) {
+ return true;
+ } else if (OverseerAction.get((String) zkNodeProps.get(OPERATION)) == OverseerAction.RECOVERYNODE) {
log.debug("bulk state publish recovery node, props={} result={}", zkNodeProps, bulkMessage);
- String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+ String nodeName = (String) zkNodeProps.get(ZkStateReader.NODE_NAME_PROP);
// clearStatesForNode(bulkMessage, nodeName);
- bulkMessage.getProperties().put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
+ bulkMessage.put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
log.debug("bulk state publish recovery node, props={} result={}" , zkNodeProps, bulkMessage);
+ return true;
} else {
//String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
- String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
- String id = zkNodeProps.getStr("id");
- String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
+ String core = (String) zkNodeProps.get(ZkStateReader.CORE_NAME_PROP);
+ String id = (String) zkNodeProps.get("id");
+ String state = (String) zkNodeProps.get(ZkStateReader.STATE_PROP);
String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
- bulkMessage.getProperties().put(id, line);
- if (state.equals(Replica.State.RECOVERING.toString())) {
- return true;
- }
+ bulkMessage.put(id, line);
+// if (state.equals(Replica.State.RECOVERING.toString())) {
+// return true;
+// }
}
return false;
}
@@ -207,7 +214,7 @@ public class StatePublisher implements Closeable {
}
}
- private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
+ private void processMessage(ConcurrentHashMap message) throws KeeperException, InterruptedException {
log.info("Send state updates to Overseer {}", message);
byte[] updates = Utils.toJSON(message);
@@ -246,11 +253,12 @@ public class StatePublisher implements Closeable {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Nulls in published state " + stateMessage);
}
-// if ((state.equals(UpdateLog.State.ACTIVE.toString().toLowerCase(Locale.ROOT)) || state.equals("leader")) && cc.isCoreLoading(core)) {
-// cc.waitForLoadingCore(core, 10000);
-// }
-
DocCollection coll = zkStateReader.getClusterState().getCollectionOrNull(collection);
+
+ if (coll == null) {
+ zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
+ }
+
if (coll != null) {
Replica replica = coll.getReplica(core);
if (replica != null) {
@@ -262,10 +270,10 @@ public class StatePublisher implements Closeable {
CacheEntry lastState = stateCache.get(id);
//&& (System.currentTimeMillis() - lastState.time < 1000) &&
// TODO: needs work
-// if (state.equals(lastState.state)) {
-// log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
-// return;
-// }
+ if (replica != null && replica.getType() == Replica.Type.PULL && lastState != null && state.equals(lastState.state) && (System.currentTimeMillis() - lastState.time < 10000)) {
+ log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+ return;
+ }
}
if (id == null) {
@@ -321,7 +329,11 @@ public class StatePublisher implements Closeable {
}
}
- workQueue.offer(stateMessage);
+ if (stateMessage == TERMINATE_OP) {
+ workQueue.offer(TERMINATE_OP_MAP);
+ } else {
+ workQueue.offer(new ConcurrentHashMap(stateMessage.getProperties()));
+ }
} catch (Exception e) {
log.error("Exception trying to publish state message={}", stateMessage, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
@@ -342,9 +354,8 @@ public class StatePublisher implements Closeable {
}
public void close() {
- this.terminated = true;
try {
- workerFuture.cancel(false);
+ workerFuture.get();
} catch (Exception e) {
log.error("Exception waiting for close", e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index c49425c..9406952 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -21,9 +21,7 @@ import org.apache.solr.client.solrj.cloud.DistributedLock;
import org.apache.solr.client.solrj.cloud.LockListener;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -52,7 +50,6 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
@@ -115,6 +112,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
/**
* Handle ZooKeeper interactions.
@@ -155,11 +153,6 @@ public class ZkController implements Closeable, Runnable {
@Override
public void run() {
- try {
- publishNodeAs(getNodeName(), OverseerAction.DOWNNODE);
- } catch (Exception e) {
- log.warn("Problem publish node as DOWN", e);
- }
disconnect(true);
log.info("Continuing to Solr shutdown");
}
@@ -575,18 +568,15 @@ public class ZkController implements Closeable, Runnable {
try (ParWork closer = new ParWork(this, true, false)) {
closer.collect("replicateFromLeaders", replicateFromLeaders);
closer.collect(leaderElectors);
+ }
-// if (publishDown) {
-// closer.collect("PublishNodeAsDown&RepFromLeaders", () -> {
-// try {
-// log.info("Publish this node as DOWN...");
-// publishNodeAs(getNodeName(), OverseerAction.DOWNNODE);
-// } catch (Exception e) {
-// ParWork.propagateInterrupt("Error publishing nodes as down. Continuing to close CoreContainer", e);
-// }
-// return "PublishDown";
-// });
-// }
+
+ if (publishDown) {
+ try {
+ publishNodeAs(getNodeName(), OverseerAction.DOWNNODE);
+ } catch (Exception e) {
+ log.warn("Problem publish node as DOWN", e);
+ }
}
}
@@ -606,19 +596,14 @@ public class ZkController implements Closeable, Runnable {
closer.collect(cloudManager);
closer.collect(cloudSolrClient);
- closer.collect("", () -> {
- try {
- if (statePublisher != null) {
- statePublisher.submitState(StatePublisher.TERMINATE_OP);
- }
- } catch (Exception e) {
- log.error("Exception closing state publisher");
- }
- });
-
collectionToTerms.forEach((s, zkCollectionTerms) -> closer.collect(zkCollectionTerms));
} finally {
+ if (statePublisher != null) {
+ statePublisher.submitState(StatePublisher.TERMINATE_OP);
+ }
+
+ IOUtils.closeQuietly(statePublisher);
IOUtils.closeQuietly(overseerElector);
if (overseer != null) {
try {
@@ -1075,6 +1060,13 @@ public class ZkController implements Closeable, Runnable {
if (cc != null) cc.securityNodeChanged();
});
zkStateReader.setNode(nodeName);
+ zkStateReader.setLeaderChecker(name -> {
+ LeaderElector elector = leaderElectors.get(name);
+ if (elector != null && elector.isLeader()) {
+ return true;
+ }
+ return false;
+ });
zkStateReader.setCollectionRemovedListener(this::removeCollectionTerms);
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
@@ -1247,6 +1239,8 @@ public class ZkController implements Closeable, Runnable {
return overseerElector != null && overseerElector.isLeader();
}
+ public static volatile Predicate<CoreDescriptor> testing_beforeRegisterInZk;
+
/**
* Register shard with ZooKeeper.
*
@@ -1256,6 +1250,14 @@ public class ZkController implements Closeable, Runnable {
if (getCoreContainer().isShutDown() || isDcCalled()) {
throw new AlreadyClosedException();
}
+
+ if (testing_beforeRegisterInZk != null) {
+ boolean didTrigger = testing_beforeRegisterInZk.test(desc);
+ if (log.isDebugEnabled()) {
+ log.debug("{} pre-zk hook", (didTrigger ? "Ran" : "Skipped"));
+ }
+ }
+
MDCLoggingContext.setCoreName(desc.getName());
ZkShardTerms shardTerms = null;
// LeaderElector leaderElector = null;
@@ -1302,7 +1304,7 @@ public class ZkController implements Closeable, Runnable {
log.info("Wait to see leader for {}, {}", collection, shardId);
String leaderName = null;
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 10; i++) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
@@ -1313,46 +1315,11 @@ public class ZkController implements Closeable, Runnable {
break;
}
try {
- Replica leader = zkStateReader.getLeaderRetry(collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 5000));
+ Replica leader = zkStateReader.getLeaderRetry(getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(),collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 2000), true);
leaderName = leader.getName();
+ break;
- boolean isLeader = leaderName.equals(coreName);
-
- if (isLeader) {
- if (leaderElector != null && leaderElector.isLeader()) {
- break;
- } else {
- Thread.sleep(100);
- }
- } else {
- boolean stop = true;
- CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
- prepCmd.setCoreName(leader.getName());
- prepCmd.setLeaderName(leader.getName());
- prepCmd.setCollection(collection);
- prepCmd.setShardId(shardId);
-
- int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
-
- try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient()).markInternalRequest().build()) {
-
- prepCmd.setBasePath(leader.getBaseUrl());
-
- try {
- NamedList<Object> result = client.request(prepCmd);
- } catch (Exception e) {
- log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
- stop = false;
- }
- }
- if (stop) {
- break;
- } else {
- Thread.sleep(100);
- }
- }
-
- } catch (TimeoutException timeoutException) {
+ } catch (TimeoutException | InterruptedException e) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 6fc989b..40f0446 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class CollectionMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -159,7 +160,7 @@ public class CollectionMutator {
}
return clusterState.copyWith(coll.getName(),
- new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getStateUpdates()));
+ new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), (ConcurrentHashMap) coll.getStateUpdates()));
}
public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
index abcd76c..c222586 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
@@ -18,8 +18,6 @@ package org.apache.solr.cloud.overseer;
import java.util.Locale;
-import org.apache.solr.common.ParWork;
-
/**
* Enum of actions supported by the overseer only.
*
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 5c98e6b..8ed9c3e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -99,9 +99,11 @@ public class ZkStateWriter {
String collectionName = docCollection.getName();
ColState collState = collLocks.compute(collectionName, (s, colState) -> {
if (colState == null) {
+ log.debug("create new collection lock for {}", collectionName);
ColState cState = new ColState();
return cState;
}
+ log.debug("use existing collection lock for {}", collectionName);
return colState;
});
collState.collLock.lock();
@@ -121,6 +123,7 @@ public class ZkStateWriter {
Slice currentSlice = currentCollection.getSlice(slice.getName());
if (currentSlice != null) {
if (currentSlice.get("remove") != null || slice.getProperties().get("remove") != null) {
+ log.debug("remove slice {}", slice.getName());
removeSlices.add(slice.getName());
} else {
currentCollection.getSlicesMap().put(slice.getName(), slice.update(currentSlice));
@@ -146,7 +149,11 @@ public class ZkStateWriter {
for (String removeSlice : removeSlices) {
currentCollection.getSlicesMap().remove(removeSlice);
}
- cs.put(currentCollection.getName(), currentCollection);
+
+ DocCollection newCollection = new DocCollection(collectionName, currentCollection.getSlicesMap(), currentCollection.getProperties(), currentCollection.getRouter(),
+ currentCollection.getZNodeVersion(), (ConcurrentHashMap) currentCollection.getStateUpdates());
+ log.debug("zkwriter newCollection={}", newCollection);
+ cs.put(currentCollection.getName(), newCollection);
} else {
docCollection.getProperties().remove("pullReplicas");
@@ -166,7 +173,9 @@ public class ZkStateWriter {
for (String removeSlice : removeSlices) {
docCollection.getSlicesMap().remove(removeSlice);
}
-
+ String path = ZkStateReader.getCollectionPath(collectionName);
+ Stat stat = reader.getZkClient().exists(path, null, false, false);
+ // docCollection.setZnodeVersion(stat.getVersion());
cs.put(docCollection.getName(), docCollection);
}
@@ -196,16 +205,23 @@ public class ZkStateWriter {
//throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
}
- ConcurrentHashMap updates = stateUpdates.get(collection);
- if (updates == null) {
- updates = new ConcurrentHashMap();
- stateUpdates.put(collection, updates);
- }
+ ConcurrentHashMap updates;
DocCollection docColl = cs.get(collection);
String csVersion;
if (docColl != null) {
- csVersion = Integer.toString(docColl.getZNodeVersion());
+
+ updates = stateUpdates.get(collection);
+ if (updates == null) {
+ updates = (ConcurrentHashMap) docColl.getStateUpdates();
+ if (updates == null) {
+ updates = new ConcurrentHashMap();
+ }
+ stateUpdates.put(collection, updates);
+ }
+
+ int clusterStateVersion = docColl.getZNodeVersion();
+ csVersion = Integer.toString(clusterStateVersion);
for (StateUpdate state : entry.getValue().values()) {
if (state.sliceState != null) {
Slice slice = docColl.getSlice(state.sliceName);
@@ -258,7 +274,8 @@ public class ZkStateWriter {
log.trace("add new slice leader={} {} {}", newSlice.getLeader(), newSlice, docColl);
- DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+ DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(),
+ (ConcurrentHashMap) docColl.getStateUpdates());
cs.put(collection, newDocCollection);
docColl = newDocCollection;
updates.put(replica.getInternalId(), "l");
@@ -287,7 +304,8 @@ public class ZkStateWriter {
log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
- DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+ DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(),
+ (ConcurrentHashMap) docColl.getStateUpdates());
cs.put(collection, newDocCollection);
docColl = newDocCollection;
updates.put(replica.getInternalId(), state.state);
@@ -298,6 +316,12 @@ public class ZkStateWriter {
}
}
} else {
+ updates = stateUpdates.get(collection);
+ if (updates == null) {
+ updates = new ConcurrentHashMap();
+ stateUpdates.put(collection, updates);
+ }
+
for (StateUpdate state : entry.getValue().values()) {
log.debug("Could not find existing collection name={}", collection);
String setState = Replica.State.shortStateToState(state.state).toString();
@@ -350,7 +374,7 @@ public class ZkStateWriter {
write(collection);
break;
} catch (KeeperException.BadVersionException e) {
-
+ log.warn("hit bad version trying to write state.json, trying again ...");
} catch (Exception e) {
log.error("write pending failed", e);
break;
@@ -409,7 +433,7 @@ public class ZkStateWriter {
if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
Integer finalVersion = collection.getZNodeVersion();
- dirtyStructure.remove(collection.getName());
+
if (reader == null) {
log.error("read not initialized in zkstatewriter");
}
@@ -422,15 +446,17 @@ public class ZkStateWriter {
stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
collection.setZnodeVersion(finalVersion + 1);
-
+ dirtyStructure.remove(collection.getName());
if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
} catch (KeeperException.NoNodeException e) {
log.debug("No node found for state.json", e);
} catch (KeeperException.BadVersionException bve) {
stat = reader.getZkClient().exists(path, null, false, false);
- log.info("Tried to update state.json ({}) with bad version {} \n {}", collection, finalVersion, stat != null ? stat.getVersion() : "null");
+ log.info("Tried to update state.json for {} with bad version {} found={} \n {}", coll, finalVersion, stat != null ? stat.getVersion() : "null", collection);
+ // TODO: likely we should be extra safe and assume bad things and force fetch the state.json to get a new DocCollection
+ collection.setZnodeVersion(stat.getVersion());
throw bve;
}
@@ -438,6 +464,7 @@ public class ZkStateWriter {
ConcurrentHashMap updates = stateUpdates.get(collection.getName());
if (updates != null) {
+ // TODO: clearing these correctly is tricky
updates.clear();
writeStateUpdates(collection, updates);
}
@@ -463,13 +490,14 @@ public class ZkStateWriter {
log.error("Failed processing update=" + collection, e);
}
- if (badVersionException.get() != null) {
- throw badVersionException.get();
- }
-
} finally {
collState.collLock.unlock();
}
+
+ if (badVersionException.get() != null) {
+ throw badVersionException.get();
+ }
+
}
private void writeStateUpdates(DocCollection collection, ConcurrentHashMap updates) throws KeeperException, InterruptedException {
diff --git a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index 4c6ced7..73259a1 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -179,10 +179,10 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
@Override
public void close() throws IOException {
if (log.isTraceEnabled()) log.trace("close() - start");
-
+ closed = true;
synchronized (this) {
- closed = true;
+
if (log.isDebugEnabled()) log.debug("Closing {} - {} directories currently being tracked", this.getClass().getSimpleName(), byDirectoryCache.size());
Collection<CacheValue> values = new HashSet<>(byDirectoryCache.values());
for (CacheValue val : values) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5bab87c..4a18cd6 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -693,12 +693,6 @@ public class CoreContainer implements Closeable {
}
if (isZooKeeperAware()) {
- try {
- getZkController().publishNodeAs(getZkController().getNodeName(), OverseerAction.RECOVERYNODE);
- } catch (Exception e) {
- log.error("Failed publishing loading core as recovering", e);
- }
-
List<CoreDescriptor> removeCds = new ArrayList<>();
for (final CoreDescriptor cd : cds) {
@@ -719,13 +713,21 @@ public class CoreContainer implements Closeable {
} catch (Exception e) {
SolrException.log(log, "Failed to delete instance dir for core:" + cd.getName() + " dir:" + cd.getInstanceDir());
}
-
+ continue;
}
}
markCoreAsLoading(cd.getName());
String collection = cd.getCollectionName();
- getZkController().getZkStateReader().registerCore(collection, cd.getName());
-
+ try {
+ getZkController().getZkStateReader().registerCore(collection, cd.getName());
+ } catch (Exception e) {
+ log.error("Failed registering core with zkstatereader", e);
+ }
+ }
+ try {
+ getZkController().publishNodeAs(getZkController().getNodeName(), OverseerAction.RECOVERYNODE);
+ } catch (Exception e) {
+ log.error("Failed publishing loading core as recovering", e);
}
for (CoreDescriptor removeCd : removeCds) {
cds.remove(removeCd);
@@ -736,6 +738,8 @@ public class CoreContainer implements Closeable {
}
}
+
+ try {
// Always add $SOLR_HOME/lib to the shared resource loader
Set<String> libDirs = new LinkedHashSet<>();
libDirs.add("lib");
@@ -766,7 +770,7 @@ public class CoreContainer implements Closeable {
containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
containerHandlers.getApiBag().registerObject(packageStoreAPI.writeAPI);
- try {
+
logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);
@@ -853,10 +857,7 @@ public class CoreContainer implements Closeable {
});
}
- } catch (Exception e) {
- log.error("Exception in CoreContainer load", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, "Exception in CoreContainer load", e);
- }
+
if (!containerHandlers.keySet().contains(CORES_HANDLER_PATH)) {
throw new IllegalStateException("No core admin path was loaded " + CORES_HANDLER_PATH);
@@ -900,10 +901,6 @@ public class CoreContainer implements Closeable {
metricManager.loadClusterReporters(cfg.getMetricsConfig().getMetricReporters(), this);
}
- List<Future<SolrCore>> coreLoadFutures = null;
-
-
- coreLoadFutures = new ArrayList<>(cds.size());
if (isZooKeeperAware()) {
cds = CoreSorter.sortCores(this, cds);
}
@@ -915,6 +912,15 @@ public class CoreContainer implements Closeable {
zkSys.getZkController().createEphemeralLiveNode();
}
+ } catch (Exception e) {
+ log.error("Exception in CoreContainer load", e);
+ for (final CoreDescriptor cd : cds) {
+ markCoreAsNotLoading(cd.getName());
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception in CoreContainer load", e);
+ }
+ List<Future<SolrCore>> coreLoadFutures = null;
+ coreLoadFutures = new ArrayList<>(cds.size());
for (final CoreDescriptor cd : cds) {
if (log.isDebugEnabled()) log.debug("Process core descriptor {} {} {}", cd.getName(), cd.isTransient(), cd.isLoadOnStartup());
@@ -922,26 +928,6 @@ public class CoreContainer implements Closeable {
solrCores.addCoreDescriptor(cd);
}
- // MRM TODO: look at ids for this
-// if (isZooKeeperAware()) {
-// String collection = cd.getCollectionName();
-//
-// if (!zkSys.zkController.getClusterState().hasCollection(collection)) {
-// solrCores.markCoreAsNotLoading(cd);
-// try {
-// coresLocator.delete(this, cd);
-// } catch (Exception e) {
-// log.error("Exception deleting core.properties file for non existing collection", e);
-// }
-//
-// try {
-// unload(cd, cd.getName(),true, true, true);
-// } catch (Exception e) {
-// log.error("Exception unloading core for non existing collection", e);
-// }
-// continue;
-// }
-// }
if (cd.isLoadOnStartup()) {
startedLoadingCores = true;
@@ -949,14 +935,7 @@ public class CoreContainer implements Closeable {
SolrCore core = null;
MDCLoggingContext.setCoreName(cd.getName());
try {
- try {
-
- core = createFromDescriptor(cd, false);
-
- } finally {
- solrCores.markCoreAsNotLoading(cd);
- }
-
+ core = createFromDescriptor(cd, false);
} catch (AlreadyClosedException e){
log.warn("Will not finish creating and registering core={} because we are shutting down", cd.getName(), e);
} catch (Exception e){
@@ -1250,17 +1229,10 @@ public class CoreContainer implements Closeable {
throw new AlreadyClosedException("Will not register SolrCore with ZooKeeper, already closed");
}
- // if (isShutDown) {
- // core.close();
- // throw new IllegalStateException("This CoreContainer has been closed");
- // }
+ core.setName(cd.getName());
SolrCore old = solrCores.putCore(cd, core);
- /*
- * set both the name of the descriptor and the name of the
- * core, since the descriptors name is used for persisting.
- */
- core.setName(cd.getName());
+ markCoreAsNotLoading(cd.getName());
coreInitFailures.remove(cd.getName());
@@ -1494,6 +1466,7 @@ public class CoreContainer implements Closeable {
throw solrException;
} catch (Throwable t) {
log.error("Unable to create SolrCore", t);
+ solrCores.markCoreAsNotLoading(dcore);
SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
solrCores.remove(dcore.getName());
@@ -1944,6 +1917,10 @@ public class CoreContainer implements Closeable {
if (cd == null) {
cd = solrCores.getCoreDescriptor(name);
}
+ if (name == null && cd != null) {
+ name = cd.getName();
+ }
+
SolrException exception = null;
try {
@@ -1982,9 +1959,10 @@ public class CoreContainer implements Closeable {
}
}
- SolrCore core;
-
- core = solrCores.remove(name);
+ SolrCore core = null;
+ if (name != null) {
+ core = solrCores.remove(name);
+ }
if (core != null) {
if (cd == null) {
cd = core.getCoreDescriptor();
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 34f028d..38496b4 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -106,7 +106,6 @@ import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.IndexFingerprint;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.SolrCoreState.IndexWriterCloser;
-import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
@@ -505,10 +504,15 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
public void setName(String v) {
+ if (v.equals(this.name)) {
+ return;
+ }
this.name = v;
this.logid = (v == null) ? "" : ("[" + v + "] ");
if (coreMetricManager != null) {
- coreMetricManager.afterCoreSetName();
+ ParWork.getRootSharedExecutor().submit(() -> {
+ coreMetricManager.afterCoreSetName();
+ });
}
}
@@ -736,7 +740,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
core = new SolrCore(coreContainer, getName(), coreConfig, cd, getDataDir(), updateHandler, solrDelPolicy, currentCore, true);
core.start();
// we open a new IndexWriter to pick up the latest config
- core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
+ core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false, false);
// core.getSearcher(true, false, null, true);
success = true;
return core;
@@ -804,7 +808,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
* @deprecated Use of this method can only lead to race conditions. Try
* to actually obtain a lock instead.
*/
- @Deprecated private static boolean isWriterLocked (Directory directory) throws IOException {
+ @Deprecated private static boolean isWriterLocked(Directory directory) throws IOException {
try {
directory.obtainLock(IndexWriter.WRITE_LOCK_NAME).close();
return false;
@@ -822,13 +826,12 @@ public final class SolrCore implements SolrInfoBean, Closeable {
// Create the index if it doesn't exist.
if (!indexExists) {
log.debug("{}Solr index directory '{}' doesn't exist. Creating new index...", logid, indexDir);
-
- try (SolrIndexWriter writer = SolrIndexWriter
- .buildIndexWriter(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true, getLatestSchema(), solrConfig.indexConfig, solrDelPolicy,
- codec, true)) {
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ RefCounted<IndexWriter> writer = getSolrCoreState().getIndexWriter(this, true);
+ IndexWriter iw = writer.get();
+ try {
+ iw.commit(); // readers need to see the segments file
+ } finally {
+ writer.decref();
}
}
@@ -1669,12 +1672,17 @@ public final class SolrCore implements SolrInfoBean, Closeable {
*
* @see #isClosed()
*/
- @Override public void close () {
+ @Override
+ public synchronized void close () {
int cref = refCount.get();
+ if (cref < 0) {
+ log.warn("Already closed " + cref);
+ return;
+ }
int count = refCount.decrementAndGet();
- if (count < -1) {
+ if (count < 0) {
refCount.set(-1);
log.warn("Already closed " + count);
return;
@@ -1739,7 +1747,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
int timeouts = 30;
- // MRM TODO: put this timeout in play again
+ // MRM TODO: put this timeout in play again?
TimeOut timeout = new TimeOut(timeouts, TimeUnit.SECONDS, TimeSource.NANO_TIME);
int cnt = 0;
while (!canBeClosed() || refCount.get() != -1) {
@@ -1775,19 +1783,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
return;
}
try {
- if (closing) {
- this.closing = true;
- while (!isClosed) {
- synchronized (closeAndWait) {
- try {
- closeAndWait.wait(500);
- } catch (InterruptedException e) {
-
- }
- }
- }
- return;
- }
if (log.isDebugEnabled()) log.debug("CLOSING SolrCore {}", logid);
assert ObjectReleaseTracker.release(this);
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index ae29756..db7ecb4 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -96,8 +96,6 @@ class SolrCores implements Closeable {
if (log.isDebugEnabled()) log.debug("Closing SolrCores");
this.closed = true;
- currentlyLoadingCores.clear();
-
Collection<SolrCore> coreList = new ArrayList<>();
TransientSolrCoreCache transientSolrCoreCache = getTransientCacheHandler();
@@ -135,6 +133,7 @@ class SolrCores implements Closeable {
}
});
+ currentlyLoadingCores.clear();
}
// Returns the old core if there was a core of the same name.
@@ -284,12 +283,14 @@ class SolrCores implements Closeable {
}
protected SolrCore remove(String name) {
+ currentlyLoadingCores.remove(name);
+
if (name == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Cannot unload non-existent core [null]");
}
if (log.isDebugEnabled()) log.debug("remove core from solrcores {}", name);
- currentlyLoadingCores.remove(name);
+
SolrCore ret = cores.remove(name);
residentDesciptors.remove(name);
// It could have been a newly-created core. It could have been a transient core. The newly-created cores
@@ -387,11 +388,6 @@ class SolrCores implements Closeable {
return cds;
}
- // cores marked as loading will block on getCore
- public void markCoreAsLoading(CoreDescriptor cd) {
- markCoreAsLoading(cd.getName());
- }
-
public void markCoreAsLoading(String name) {
if (getAllCoreNames().contains(name)) {
log.warn("Creating a core with existing name is not allowed {}", name);
@@ -441,7 +437,7 @@ class SolrCores implements Closeable {
while (isCoreLoading(core)) {
synchronized (loadingSignal) {
try {
- loadingSignal.wait(1000);
+ loadingSignal.wait(500);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
return;
@@ -454,10 +450,7 @@ class SolrCores implements Closeable {
}
public boolean isCoreLoading(String name) {
- if (currentlyLoadingCores.contains(name)) {
- return true;
- }
- return false;
+ return (currentlyLoadingCores.contains(name));
}
public TransientSolrCoreCache getTransientCacheHandler() {
@@ -472,6 +465,5 @@ class SolrCores implements Closeable {
public void closing() {
this.closed = true;
- currentlyLoadingCores.clear();
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 60be50b..01ad136 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -195,36 +195,6 @@ public class ZkContainer implements Closeable {
if (zkRun == null || zkRun.trim().length() == 0 || zkRun.lastIndexOf('/') < 0) return zkRun;
return zkRun.substring(0, zkRun.lastIndexOf('/'));
}
-
- public static volatile Predicate<CoreDescriptor> testing_beforeRegisterInZk;
-
- public Future registerInZk(final SolrCore core) {
- log.info("Register in ZooKeeper core={} liveNodes={}", core.getName(), zkController.getZkStateReader().getLiveNodes());
- CoreDescriptor cd = core.getCoreDescriptor(); // save this here - the core may not have it later
- Runnable r = () -> {
- MDCLoggingContext.setCoreName(core.getName());
- try {
- try {
- if (testing_beforeRegisterInZk != null) {
- boolean didTrigger = testing_beforeRegisterInZk.test(cd);
- if (log.isDebugEnabled()) {
- log.debug("{} pre-zk hook", (didTrigger ? "Ran" : "Skipped"));
- }
- }
-
- zkController.register(core.getName(), cd);
-
- } catch (Exception e) {
- log.error("Failed trying to register with zookeeper", e);
- }
- } finally {
- MDCLoggingContext.clear();
- }
- };
- // r.run();
- return ParWork.getRootSharedExecutor().submit(r); // ### expert usage
- //return null;
- }
public ZkController getZkController() {
return zkController;
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 8b388cc..007291a 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -152,7 +152,7 @@ public class IndexFetcher {
final ReplicationHandler replicationHandler;
- private volatile Date replicationStartTimeStamp;
+ private volatile long replicationStartTimeStamp;
private RTimer replicationTimer;
private final SolrCore solrCore;
@@ -950,7 +950,9 @@ public class IndexFetcher {
// must get the latest solrCore object because the one we have might be closed because of a reload
// todo stop keeping solrCore around
try (SolrCore core = solrCore.getCoreContainer().getCore(solrCore.getName())) {
- @SuppressWarnings({"rawtypes"})
+// testing
+// @SuppressWarnings({"rawtypes"}) SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+// core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
Future[] waitSearcher = new Future[1];
searcher = core.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
@@ -1598,21 +1600,21 @@ public class IndexFetcher {
@SuppressForbidden(reason = "Need currentTimeMillis for debugging/stats")
private void markReplicationStart() {
replicationTimer = new RTimer();
- replicationStartTimeStamp = new Date();
+ replicationStartTimeStamp = System.nanoTime();
}
private void markReplicationStop() {
- replicationStartTimeStamp = null;
+ replicationStartTimeStamp = 0;
replicationTimer = null;
}
Date getReplicationStartTimeStamp() {
- return replicationStartTimeStamp;
+ return new Date(TimeUnit.MILLISECONDS.convert(replicationStartTimeStamp, TimeUnit.NANOSECONDS));
}
long getReplicationTimeElapsed() {
long timeElapsed = 0;
- if (replicationStartTimeStamp != null)
+ if (replicationStartTimeStamp > 0)
timeElapsed = TimeUnit.SECONDS.convert((long) replicationTimer.getTime(), TimeUnit.MILLISECONDS);
return timeElapsed;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
index 7483a6a..b103f09 100644
--- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
@@ -38,6 +38,7 @@ import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.PluginBag;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.handler.admin.PrepRecoveryOp;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
@@ -226,7 +227,9 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
} catch (Exception e) {
- log.error("Exception handling request", e);
+ if (log.isDebugEnabled() && !(e instanceof PrepRecoveryOp.NotValidLeader)) {
+ log.error("Exception handling request", e);
+ }
if (req.getCore() != null) {
boolean isTragic = req.getCore().getCoreContainer().checkTragicException(req.getCore());
if (isTragic) {
@@ -255,6 +258,11 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
}
}
+ if (e instanceof PrepRecoveryOp.NotValidLeader) {
+ isServerError = false;
+ incrementErrors = false;
+ }
+
rsp.setException(e);
if (incrementErrors) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 260e8e0..1ce860c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -139,6 +139,9 @@ public class ColStatus {
case RECOVERING:
recoveringReplicas++;
break;
+ case BUFFERING:
+ recoveringReplicas++;
+ break;
case RECOVERY_FAILED:
recoveryFailedReplicas++;
break;
@@ -156,7 +159,7 @@ public class ColStatus {
sliceMap.add("routingRules", rules);
}
sliceMap.add("replicas", replicaMap);
- Replica leader = zkStateReader.getLeaderRetry(collection, s.getName(), 10000);
+ Replica leader = s.getLeader();
if (leader == null) { // pick the first one
leader = s.getReplicas().size() > 0 ? s.getReplicas().iterator().next() : null;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index b534a00..adf38e9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -387,6 +387,9 @@ enum CoreAdminOperation implements CoreAdminOp {
public void execute(CallInfo it) throws Exception {
try {
fun.execute(it);
+ } catch (PrepRecoveryOp.NotValidLeader e) {
+ // No need to re-wrap; throw as-is.
+ throw e;
} catch (SolrException | InterruptedException e) {
log.error("Error handling CoreAdmin action", e);
if (e instanceof InterruptedException) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index dee1675..827a002 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -19,6 +19,7 @@ package org.apache.solr.handler.admin;
import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
@@ -35,7 +36,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
+public class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
@@ -64,12 +65,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
LeaderElector leaderElector = it.handler.coreContainer.getZkController().getLeaderElector(leaderName);
if (leaderElector == null || !leaderElector.isLeader()) {
- throw new IllegalStateException("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+ throw new NotValidLeader("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
" coll=" + collection);
}
if (waitForState == null) {
- log.info("Done checking leader:", cname);
+ log.info("Done checking leader:", leaderName);
return;
}
@@ -120,4 +121,11 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
log.error(error);
}
}
+
+ public static class NotValidLeader extends SolrException {
+
+ public NotValidLeader(String s) {
+ super(ErrorCode.BAD_REQUEST, s);
+ }
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index d5152ca..4831833 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -71,6 +71,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.PrepRecoveryOp;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.ResultContext;
@@ -145,7 +146,7 @@ public class RealTimeGetComponent extends SearchComponent
if (req.getCore().getCoreContainer().isZooKeeperAware() && Boolean.parseBoolean(onlyIfLeader)) {
LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
if (leaderElector == null || !leaderElector.isLeader()) {
- throw new IllegalStateException("Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+ throw new PrepRecoveryOp.NotValidLeader("Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
" coll=" + req.getCore().getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
}
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index c9e803a..5da60fe 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -49,6 +49,7 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ContentStreamHandlerBase;
+import org.apache.solr.handler.admin.PrepRecoveryOp;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -233,7 +234,6 @@ public class HttpSolrCall {
solrReq.getContext().put(CoreContainer.class.getName(), cores);
requestType = RequestType.ADMIN;
action = ADMIN;
- ensureStatesAreAtLeastAtClient();
return;
}
@@ -266,7 +266,7 @@ public class HttpSolrCall {
}
if (core == null && log.isDebugEnabled()) {
- log.debug("tried to get core by name {} got {}, existing cores {} found={}", origCorename, core, cores.getAllCoreNames(), core != null);
+ log.debug("tried to get core by name {} got {}, existing cores {} loading={} found={}", origCorename, core, cores.getAllCoreNames(), cores.getLoadedCoreNames(), core != null);
}
if (core != null) {
@@ -350,10 +350,8 @@ public class HttpSolrCall {
solrReq = parser.parse(core, path, req);
}
-
invalidStates = checkStateVersionsAreValid(getCollectionsList(), queryParams.get(CloudSolrClient.STATE_VERSION));
- ensureStatesAreAtLeastAtClient();
addCollectionParamIfNeeded(getCollectionsList());
action = PROCESS;
@@ -370,34 +368,6 @@ public class HttpSolrCall {
action = PASSTHROUGH;
}
- private void ensureStatesAreAtLeastAtClient() throws InterruptedException, TimeoutException {
-// if (cores.isZooKeeperAware()) {
-// if (log.isDebugEnabled()) log.debug("State version for request is {}", queryParams.get(CloudSolrClient.STATE_VERSION));
-// Map<String,Integer> invalidStates = getStateVersions(queryParams.get(CloudSolrClient.STATE_VERSION));
-// if (invalidStates != null) {
-// Set<Map.Entry<String,Integer>> entries = invalidStates.entrySet();
-// for (Map.Entry<String,Integer> entry : entries) {
-// String collection = entry.getKey();
-// Integer version = entry.getValue();
-// if (log.isDebugEnabled()) log.debug("ensure states are at at least client version {} for collection {}", version, collection);
-// DocCollection docCollection = cores.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
-// if (docCollection != null && docCollection.getZNodeVersion() < version) {
-// cores.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
-// if (collectionState == null) {
-// return false;
-// }
-// log.info("found server state version {}", collectionState.getZNodeVersion());
-// if (collectionState.getZNodeVersion() < version) {
-// return false;
-// }
-// return true;
-// });
-// }
-// }
-// }
-// }
- }
-
protected void autoCreateSystemColl(String corename) throws Exception {
if (core == null &&
SYSTEM_COLL.equals(corename) &&
@@ -656,7 +626,7 @@ public class HttpSolrCall {
default: return action;
}
} catch (Throwable ex) {
- if (shouldAudit(EventType.ERROR)) {
+ if (!(ex instanceof PrepRecoveryOp.NotValidLeader) && shouldAudit(EventType.ERROR)) {
cores.getAuditLoggerPlugin().doAudit(new AuditEvent(EventType.ERROR, ex, req));
}
sendError(ex);
@@ -794,14 +764,6 @@ public class HttpSolrCall {
listener.getInputStream().transferTo(response.getOutputStream());
-// try {
-// listener.await(60, TimeUnit.SECONDS); // MRM TODO: timeout
-// } catch (InterruptedException e) {
-// log.error("Interrupted waiting for proxy request");
-// } catch (TimeoutException e) {
-// throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Timeout proxying request");
-// }
-
if (failException.get() != null) {
sendError(failException.get());
}
@@ -1110,55 +1072,54 @@ public class HttpSolrCall {
return null;
}
- try {
- zkStateReader.waitForActiveCollection(collectionName, 10000, TimeUnit.MILLISECONDS, true, collection.getSlices().size(), collection.getReplicas().size(), false);
- } catch (Exception e) {
- log.warn("Did not find leaders for collection:" + collection.getName());
- }
-
if (isPreferLeader) {
List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
log.debug("preferLeader leaderReplicas={}", leaderReplicas);
- SolrCore core = randomlyGetSolrCore(cores.getZkController().getZkStateReader().getLiveNodes(), leaderReplicas, true);
+ SolrCore core = randomlyGetSolrCore(leaderReplicas, true);
if (core != null) return core;
}
List<Replica> replicas = collection.getReplicas(cores.getZkController().getNodeName());
if (log.isDebugEnabled()) log.debug("replicas for node {} {}", replicas, cores.getZkController().getNodeName());
- SolrCore returnCore = randomlyGetSolrCore(cores.getZkController().getZkStateReader().getLiveNodes(), replicas, true);
+ SolrCore returnCore = randomlyGetSolrCore(replicas, true);
if (log.isDebugEnabled()) log.debug("returning core by collection {}", returnCore == null ? null : returnCore.getName());
return returnCore;
}
- private SolrCore randomlyGetSolrCore(Set<String> liveNodes, List<Replica> replicas, boolean checkActive) {
+ private SolrCore randomlyGetSolrCore(List<Replica> replicas, boolean checkActive) {
if (replicas != null) {
RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
while (it.hasNext()) {
Replica replica = it.next();
- if (liveNodes.contains(replica.getNodeName())) {
- SolrCore core = checkProps(replica);
- if (core != null && checkActive && replica.getState() != Replica.State.ACTIVE) {
- try {
- cores.getZkController().getZkStateReader().waitForState(core.getCoreDescriptor().getCollectionName(), 1, TimeUnit.SECONDS, (liveNodes1, coll) -> {
- if (coll == null) {
- return false;
- }
- Replica rep = coll.getReplica(core.getName());
- if (rep == null) {
- return false;
- }
- if (rep.getState() != Replica.State.ACTIVE) {
- return false;
- }
+
+ SolrCore core = checkProps(replica);
+ if (core != null && checkActive) {
+ try {
+ cores.getZkController().getZkStateReader().waitForState(core.getCoreDescriptor().getCollectionName(), 1, TimeUnit.SECONDS, (liveNodes1, coll) -> {
+ if (coll == null) {
+ return false;
+ }
+ Replica rep = coll.getReplica(replica.getName());
+ if (rep == null) {
+ return false;
+ }
+ if (rep.getState() == Replica.State.ACTIVE) {
return true;
- });
- } catch (InterruptedException e) {
- } catch (TimeoutException e) { }
+ }
+ return false;
+ });
+ } catch (InterruptedException e) {
+ log.debug("interrupted waiting to see active replica");
+ return null;
+ } catch (TimeoutException e) {
+ log.debug("timeout waiting to see active replica {} {}", replica.getName(), replica.getState());
+ return null;
}
- if (core != null) return core;
+ return core;
}
}
}
+
return null;
}
@@ -1181,13 +1142,8 @@ public class HttpSolrCall {
if (docCollection == null) {
return null;
}
- Collection<Slice> slices = docCollection.getActiveSlices();
-
- if (slices.isEmpty()) {
- return null;
- }
- String coreUrl = getCoreUrl(slices);
+ String coreUrl = getCoreUrl(docCollection.getSlices());
if (log.isDebugEnabled()) {
log.debug("get remote core url returning {} for {} {}", coreUrl, collectionName, origCorename);
@@ -1203,7 +1159,9 @@ public class HttpSolrCall {
Collections.shuffle(randomizedReplicas, random);
for (Replica replica : randomizedReplicas) {
- if (cores.getZkController().zkStateReader.getLiveNodes().contains(replica.getNodeName())
+ log.debug("check replica {} with node name {} against live nodes {} with state {}",
+ replica.getName(), replica.getNodeName(), cores.getZkController().getZkStateReader().getLiveNodes(), replica.getState());
+ if (!replica.getNodeName().equals(cores.getZkController().getNodeName()) && cores.getZkController().zkStateReader.getLiveNodes().contains(replica.getNodeName())
&& replica.getState() == Replica.State.ACTIVE) {
coreUrl = replica.getCoreUrl();
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 85d1e48..db32fe1 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -124,13 +124,22 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
@Override
- public RefCounted<IndexWriter> getIndexWriter(SolrCore core)
+ public RefCounted<IndexWriter> getIndexWriter(SolrCore core) throws IOException {
+ return getIndexWriter(core, false);
+ }
+
+ @Override
+ public RefCounted<IndexWriter> getIndexWriter(SolrCore core, boolean createIndex)
throws IOException {
if (core != null && (!core.indexEnabled || core.readOnly)) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Indexing is temporarily disabled");
}
+ if (core != null && core.isClosing()) {
+ throw new AlreadyClosedException();
+ }
+
boolean succeeded = false;
lock(iwLock.readLock());
try {
@@ -142,7 +151,10 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
if (refCntWriter == null) return null;
} else {
if (indexWriter == null) {
- indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
+ if (core != null && core.isClosing() || core.isClosed() || core.getCoreContainer().isShutDown()) {
+ throw new AlreadyClosedException();
+ }
+ indexWriter = createMainIndexWriter(core, createIndex,"DirectUpdateHandler2");
}
initRefCntWriter();
}
@@ -195,7 +207,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
// closes and opens index writers without any locking
- private void changeWriter(SolrCore core, boolean rollback, boolean openNewWriter) throws IOException {
+ private void changeWriter(SolrCore core, boolean rollback, boolean createIndex, boolean openNewWriter) throws IOException {
String coreName = core.getName();
// We need to null this so it picks up the new writer next get call.
@@ -224,7 +236,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
if (openNewWriter) {
- indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2");
+ indexWriter = createMainIndexWriter(core, createIndex, "DirectUpdateHandler2");
log.info("New IndexWriter is ready to be used.");
}
}
@@ -233,7 +245,17 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
public void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
lock(iwLock.writeLock());
try {
- changeWriter(core, rollback, true);
+ changeWriter(core, rollback, false, true);
+ } finally {
+ iwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void newIndexWriter(SolrCore core, boolean rollback, boolean createIndex) throws IOException {
+ lock(iwLock.writeLock());
+ try {
+ changeWriter(core, rollback, createIndex, true);
} finally {
iwLock.writeLock().unlock();
}
@@ -242,14 +264,14 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public void closeIndexWriter(SolrCore core, boolean rollback) throws IOException {
lock(iwLock.writeLock());
- changeWriter(core, rollback, false);
+ changeWriter(core, rollback, false,false);
// Do not unlock the writeLock in this method. It will be unlocked by the openIndexWriter call (see base class javadoc)
}
@Override
public void openIndexWriter(SolrCore core) throws IOException {
try {
- changeWriter(core, false, true);
+ changeWriter(core, false, false, true);
} finally {
iwLock.writeLock().unlock(); //unlock even if we failed
}
@@ -259,16 +281,16 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
public void rollbackIndexWriter(SolrCore core) throws IOException {
iwLock.writeLock().lock();
try {
- changeWriter(core, true, true);
+ changeWriter(core, true, false, true);
} finally {
iwLock.writeLock().unlock();
}
}
- protected SolrIndexWriter createMainIndexWriter(SolrCore core, String name) throws IOException {
+ protected SolrIndexWriter createMainIndexWriter(SolrCore core, boolean createIndex, String name) throws IOException {
SolrIndexWriter iw;
try {
- iw = SolrIndexWriter.buildIndexWriter(core, name, core.getNewIndexDir(), core.getDirectoryFactory(), false, core.getLatestSchema(),
+ iw = SolrIndexWriter.buildIndexWriter(core, name, core.getNewIndexDir(), core.getDirectoryFactory(), createIndex, core.getLatestSchema(),
core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec(), false);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
@@ -484,9 +506,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// a blocking race, we should not need to
// though
// iwLock.writeLock().lock();
- if (recoverying) {
- cancelRecovery(false, true);
- }
+
+ cancelRecovery(false, true);
+
try {
closeIndexWriter(closer);
} finally {
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 3094740..4923f47 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -781,7 +781,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override
public void newIndexWriter(boolean rollback) throws IOException {
- solrCoreState.newIndexWriter(core, rollback);
+ solrCoreState.newIndexWriter(core, rollback, false);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 1bfe076..e8e2dd7 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -24,6 +24,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
@@ -123,7 +124,7 @@ public class SolrCmdDistributor implements Closeable {
err.t);
// this can happen in certain situations such as close
- if (isRetry && rspCode != -1) {
+ if (isRetry) {
// if it's a io exception exception, lets try again
if (err.t instanceof SolrServerException) {
if (((SolrServerException) err.t).getRootCause() instanceof IOException && !(((SolrServerException) err.t).getRootCause() instanceof ClosedChannelException)) {
@@ -135,13 +136,13 @@ public class SolrCmdDistributor implements Closeable {
doRetry = true;
}
- if (err.req.retries < maxRetries && doRetry && !isClosed.isClosed()) {
+ if (err.req.retries.get() < maxRetries && doRetry && !isClosed.isClosed()) {
try {
- Thread.sleep(100);
+ Thread.sleep(10);
} catch (InterruptedException e) {
}
- err.req.retries++;
+ err.req.retries.incrementAndGet();
SolrException.log(SolrCmdDistributor.log, "sending update to "
+ oldNodeUrl + " failed - retrying ... retries: "
@@ -168,9 +169,9 @@ public class SolrCmdDistributor implements Closeable {
RollupRequestReplicationTracker rollupTracker,
LeaderRequestReplicationTracker leaderTracker) throws IOException {
if (nodes == null) return;
-// if (!cmd.isDeleteById()) {
-// blockAndDoRetries();
-// }
+ if (!cmd.isDeleteById()) {
+ blockAndDoRetries();
+ }
for (Node node : nodes) {
if (node == null) continue;
UpdateRequest uReq = new UpdateRequest();
@@ -215,7 +216,7 @@ public class SolrCmdDistributor implements Closeable {
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) {
// we need to do any retries before commit...
- //blockAndDoRetries();
+ blockAndDoRetries();
if (log.isDebugEnabled()) {
log.debug("Distrib commit to: {} params: {}", nodes, params);
}
@@ -350,7 +351,7 @@ public class SolrCmdDistributor implements Closeable {
public static class Req {
public Node node;
public UpdateRequest uReq;
- public int retries;
+ public AtomicInteger retries;
public boolean synchronous;
public UpdateCommand cmd;
final private RollupRequestReplicationTracker rollupTracker;
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index ff874c9..df78e89 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -106,9 +106,10 @@ public abstract class SolrCoreState {
* @param rollback close IndexWriter if false, else rollback
* @throws IOException If there is a low-level I/O error.
*/
+ public abstract void newIndexWriter(SolrCore core, boolean rollback, boolean createIndex) throws IOException;
+
public abstract void newIndexWriter(SolrCore core, boolean rollback) throws IOException;
-
/**
* Expert method that closes the IndexWriter - you must call {@link #openIndexWriter(SolrCore)}
* in a finally block after calling this method.
@@ -127,14 +128,22 @@ public abstract class SolrCoreState {
* @throws IOException If there is a low-level I/O error.
*/
public abstract void openIndexWriter(SolrCore core) throws IOException;
-
+
/**
* Get the current IndexWriter. If a new IndexWriter must be created, use the
* settings from the given {@link SolrCore}.
- *
+ *
* @throws IOException If there is a low-level I/O error.
*/
public abstract RefCounted<IndexWriter> getIndexWriter(SolrCore core) throws IOException;
+
+ /**
+ * Get the current IndexWriter. If a new IndexWriter must be created, use the
+ * settings from the given {@link SolrCore}.
+ *
+ * @throws IOException If there is a low-level I/O error.
+ */
+ public abstract RefCounted<IndexWriter> getIndexWriter(SolrCore core, boolean createIndex) throws IOException;
/**
* Rollback the current IndexWriter. When creating the new IndexWriter use the
@@ -201,7 +210,7 @@ public abstract class SolrCoreState {
public abstract Lock getRecoveryLock();
public Throwable getTragicException() throws IOException {
- RefCounted<IndexWriter> ref = getIndexWriter(null);
+ RefCounted<IndexWriter> ref = getIndexWriter(null, false);
if (ref == null) return null;
try {
return ref.get().getTragicException();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index e59b421..c993671 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -164,8 +164,30 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
Replica leaderReplica;
+
+
+
+ DocCollection coll = clusterState.getCollection(collection);
+
+ Slice slice = coll.getSlice(desc.getCloudDescriptor().getShardId());
+
+ String shardId = slice.getName();
+
try {
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId(), 1000);
+
+ // Not equivalent to getLeaderProps, which retries to find a leader.
+ leaderReplica = slice.getLeader();
+ if (leaderReplica == null) {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 10000);
+ } else {
+ isLeader = leaderReplica.getName().equals(desc.getName());
+ if (isLeader) {
+ LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+ if (leaderElector == null || !leaderElector.isLeader()) {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
+ }
+ }
+ }
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
@@ -231,9 +253,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
}
if (removeNode != null) {
+ log.debug("remove leader node since we will do a local commit now {}", leaderReplica);
useNodes.remove(removeNode);
sendCommitToReplicasAndLocalCommit(cmd, worker, leaderReplica.getName(), params);
+ if (log.isDebugEnabled()) log.debug("processCommit(CommitUpdateCommand) - end");
}
}
@@ -288,7 +312,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
worker.collect("localCommit", () -> {
try {
doLocalCommit(cmd);
- } catch (IOException e) {
+ } catch (Exception e) {
+ log.error("Failed local leader commit", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
});
@@ -605,8 +630,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
}
// DBQ forwarded to NRT and TLOG replicas
+ Set<Replica.State> matchFilters = new HashSet<>(3);
+ matchFilters.add(Replica.State.BUFFERING);
+ matchFilters.add(Replica.State.RECOVERING);
+ matchFilters.add(Replica.State.ACTIVE);
List<Replica> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, myShardId, leaderReplica.getName(), Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ .getReplicaProps(collection, myShardId, leaderReplica.getName(), matchFilters, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
final List<SolrCmdDistributor.Node> myReplicas = new ArrayList<>(replicaProps.size());
for (Replica replicaProp : replicaProps) {
@@ -648,8 +677,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// TODO: what if we are no longer the leader?
forwardToLeader = false;
+ Set<Replica.State> matchFilters = new HashSet<>(3);
+ matchFilters.add(Replica.State.BUFFERING);
+ matchFilters.add(Replica.State.RECOVERING);
+ matchFilters.add(Replica.State.ACTIVE);
List<Replica> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, name, Replica.State.BUFFERING, Replica.State.ACTIVE, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ .getReplicaProps(collection, shardId, name, matchFilters, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
if (replicaProps != null) {
nodes = new ArrayList<>(replicaProps.size());
for (Replica props : replicaProps) {
@@ -775,13 +808,23 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
Replica leaderReplica;
try {
- doDefensiveChecks(phase);
-
// Not equivalent to getLeaderProps, which retries to find a leader.
leaderReplica = slice.getLeader();
-// leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 10000);
+ if (leaderReplica == null) {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
+ } else {
+ isLeader = leaderReplica.getName().equals(desc.getName());
+ if (isLeader) {
+ LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+ if (leaderElector == null || !leaderElector.isLeader()) {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(req.getCore().getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient(), collection, shardId, 10000, true);
+ }
+ }
+ }
isLeader = leaderReplica != null && leaderReplica.getName().equals(desc.getName());
+ doDefensiveChecks(phase);
+
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
if (isSubShardLeader) {
@@ -827,7 +870,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- for (Replica replica: replicas) {
+ for (Replica replica : replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
if (log.isInfoEnabled()) {
@@ -838,14 +881,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
log.info("skip url:{} cause its term is less than leader", replica.getCoreUrl());
skippedCoreNodeNames.add(replica.getName());
- } else if (!zkController.getZkStateReader().getLiveNodes().contains(replica.getNodeName()) || (replica.getState() != Replica.State.ACTIVE &&
- replica.getState() != Replica.State.BUFFERING)) {
+ } else if (!zkController.getZkStateReader().getLiveNodes().contains(replica.getNodeName()) || (replica.getState() == Replica.State.DOWN)) {
skippedCoreNodeNames.add(replica.getName());
} else {
nodes.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replica, collection, shardId, maxRetriesToFollowers));
}
}
- if (log.isDebugEnabled()) log.debug("We are the leader {}, forward update to replicas.. {} {}", req.getCore().getName(), nodes);
+ if (log.isDebugEnabled()) log.debug("We are the leader {}, forward update to replicas.. {}", req.getCore().getName(), nodes);
return nodes;
} else {
@@ -1154,7 +1196,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
DocCollection docCollection = clusterState.getCollection(collection);
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
- if (DistribPhase.TOLEADER == phase) {
+ if (isLeader || DistribPhase.TOLEADER == phase) {
LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
if (leaderElector == null || !leaderElector.isLeader()) {
throw new IllegalStateException(
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index e590710..2a40466 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -120,7 +120,7 @@ public class TestInjection {
public volatile static String randomDelayInCoreCreation = null;
- public volatile static int randomDelayMaxInCoreCreationInSec = 10;
+ public volatile static int randomDelayMaxInCoreCreationInSec = 5;
public volatile static String splitFailureBeforeReplicaCreation = null;
diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
index 26bcc01..83d5c19 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AssignBackwardCompatibilityTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestUtil;
@@ -58,7 +59,7 @@ public class AssignBackwardCompatibilityTest extends SolrCloudTestCase {
}
@Test
- public void test() throws IOException, SolrServerException, KeeperException, InterruptedException {
+ public void test() throws IOException, SolrServerException, KeeperException, InterruptedException, TimeoutException {
Set<String> coreNames = new HashSet<>();
int numOperations = random().nextInt(15) + 15;
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index ac2be7f..b7f7e67 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -52,8 +52,8 @@ import java.nio.file.Path;
@LuceneTestCase.Nightly // MRM TODO: - check out more, convert to bridge
@Ignore // MRM TODO: convert to bridge
public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
- private static final String SHARD2 = "shard2";
- private static final String SHARD1 = "shard1";
+ private static final String SHARD2 = "s2";
+ private static final String SHARD1 = "s1";
private static final String ONE_NODE_COLLECTION = "onenodecollection";
private final boolean onlyLeaderIndexes = random().nextBoolean();
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 0815c69..7a5e1ce 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -175,6 +175,13 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
if (collectionState == null) return false;
Collection<Slice> slices = collectionState.getSlices();
for (Slice slice : slices) {
+
+ try {
+ cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION, slice.getName(), 5000, true);
+ } catch (Exception e) {
+ log.error("exception waiting for leaders", e);
+ return false;
+ }
for (Replica replica : slice.getReplicas()) {
if (cluster.getSolrClient().getZkStateReader().isNodeLive(replica.getNodeName())) {
if (replica.getState() != Replica.State.ACTIVE) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
index 3d0b339..c637fc1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
@@ -20,12 +20,16 @@ import org.apache.solr.SolrTestUtil;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
// MRM TODO: - speed this up - waits for zkwriter to see its own state after delete
public class CollectionStateZnodeTest extends SolrCloudTestCase {
@@ -54,9 +58,21 @@ public class CollectionStateZnodeTest extends SolrCloudTestCase {
Stat stat = new Stat();
zkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat);
- DocCollection c = getCollectionState(collectionName);
+ // the state.json itself can be ahead of the local DocCollection version due to state updates filling it in
+ try {
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 3, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ if (collectionState.getZNodeVersion() != stat.getVersion() && !collectionState.getStateUpdates().get("_cs_ver_").equals(Integer.toString(stat.getVersion()))) {
+ return false;
+ }
+ return true;
+ });
+ } catch (TimeoutException e) {
+ fail("failed finding state in DocCollection that appears up to date with " + stat.getVersion());
+ }
- assertEquals("DocCollection version should equal the znode version", stat.getVersion(), c.getZNodeVersion() );
// remove collection
CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 45be93a..b59505b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -89,7 +89,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
System.setProperty("solr.zkclienttimeout", "15000");
System.setProperty("zkClientTimeout", "15000");
- System.setProperty("solr.getleader.looptimeout", "10000");
String timeout = "640000";
System.setProperty("solr.http2solrclient.default.idletimeout", timeout);
System.setProperty("distribUpdateSoTimeout", timeout);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ConfigSetsAPITest.java b/solr/core/src/test/org/apache/solr/cloud/ConfigSetsAPITest.java
index 7de4fdb..36b7a26 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ConfigSetsAPITest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ConfigSetsAPITest.java
@@ -68,8 +68,7 @@ public class ConfigSetsAPITest extends SolrCloudTestCase {
}
@Test
- @LuceneTestCase.Nightly // TODO speedup
- @Ignore // MRM TODO:
+ // @LuceneTestCase.Nightly // TODO speedup
public void testSharedSchema() throws Exception {
CollectionAdminRequest.createCollection("col1", "cShare", 1, 1)
.processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
@@ -80,10 +79,10 @@ public class ConfigSetsAPITest extends SolrCloudTestCase {
CoreContainer coreContainer = cluster.getJettySolrRunner(0).getCoreContainer();
- try (SolrCore coreCol1 = coreContainer.getCore("col1_s1_r1");
- SolrCore coreCol2 = coreContainer.getCore("col2_s1_r1");
- SolrCore coreCol3 = coreContainer.getCore("col3_s1_r1")) {
- assertSame(coreCol1.getLatestSchema(), coreCol2.getLatestSchema());
+ try (SolrCore coreCol1 = coreContainer.getCore("col1_s1_r_n1");
+ SolrCore coreCol2 = coreContainer.getCore("col2_s1_r_n1");
+ SolrCore coreCol3 = coreContainer.getCore("col3_s1_r_n1")) {
+ assertSame(coreContainer.getAllCoreNames().toString(), coreCol1.getLatestSchema(), coreCol2.getLatestSchema());
assertNotSame(coreCol1.getLatestSchema(), coreCol3.getLatestSchema());
}
@@ -92,8 +91,8 @@ public class ConfigSetsAPITest extends SolrCloudTestCase {
SolrTestCaseJ4.map("collection.configName", "conf1") // from cShare
).processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT);
- try (SolrCore coreCol1 = coreContainer.getCore("col1_shard1_replica_n1");
- SolrCore coreCol2 = coreContainer.getCore("col2_shard1_replica_n1")) {
+ try (SolrCore coreCol1 = coreContainer.getCore("col1_s1_r_n1");
+ SolrCore coreCol2 = coreContainer.getCore("col2_s1_r_n1")) {
assertNotSame(coreCol1.getLatestSchema(), coreCol2.getLatestSchema());
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
index 5b5d462..c9265e7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
@@ -94,7 +94,7 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
// CoreAdminRequest.Create createRequest = new CoreAdminRequest.Create();
// createRequest.setCoreName("testcore");
// createRequest.setCollection(collectionName);
-// createRequest.setShardId("shard2");
+// createRequest.setShardId("s2");
// queryClient.request(createRequest);
// });
// assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("No coreNodeName for"));
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
index 3bec335..b9c2365 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
@@ -24,7 +24,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.util.StrUtils;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +33,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
-@Ignore // MRM TODO: flakey
public class DeleteNodeTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -68,14 +66,14 @@ public class DeleteNodeTest extends SolrCloudTestCase {
CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 0, 0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 1, 0)
);
- create = create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(20);
+ create = create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(20).waitForFinalState(true);
cloudClient.request(create);
state = cloudClient.getZkStateReader().getClusterState();
String node2bdecommissioned = l.get(0);
// check what replicas are on the node, and whether the call should fail
//new CollectionAdminRequest.DeleteNode(node2bdecommissioned).processAsync("003", cloudClient);
- new CollectionAdminRequest.DeleteNode(node2bdecommissioned).process(cloudClient);
+ new CollectionAdminRequest.DeleteNode(node2bdecommissioned).waitForFinalState(true).process(cloudClient);
// CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("003");
CollectionAdminRequest.RequestStatusResponse rsp = null;
// if (shouldFail) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 4a506ed..5a5b462 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -49,12 +49,10 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZkStateReaderAccessor;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.ZkContainer;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -268,12 +266,10 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
@Test
@LuceneTestCase.Slow
// commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
- @LuceneTestCase.Nightly // TODO look at performance of this - need lower connection timeouts for test?
- @Ignore // MRM TODO:
+ //@LuceneTestCase.Nightly // TODO look at performance of this - need lower connection timeouts for test?
public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
final String collectionName = "raceDeleteReplicaCollection";
- CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
- .process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).waitForFinalState(true).process(cluster.getSolrClient());
Slice shard1 = getCollectionState(collectionName).getSlice("s1");
Replica leader = shard1.getLeader();
@@ -288,70 +284,56 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
Semaphore waitingForReplicaGetDeleted = new Semaphore(0);
// for safety, we only want this hook get triggered one time
AtomicInteger times = new AtomicInteger(0);
- ZkContainer.testing_beforeRegisterInZk = cd -> {
- if (cd.getCloudDescriptor() == null) return false;
- if (replica1.getName().equals(cd.getName())
- && collectionName.equals(cd.getCloudDescriptor().getCollectionName())) {
- if (times.incrementAndGet() > 1) {
- return false;
- }
- log.info("Running delete core {}",cd);
-
- try {
- ZkNodeProps m = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
- ZkStateReader.CORE_NAME_PROP, replica1.getName(),
- ZkStateReader.NODE_NAME_PROP, replica1.getNodeName(),
- ZkStateReader.COLLECTION_PROP, collectionName);
- cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
-
- boolean replicaDeleted = false;
- TimeOut timeOut = new TimeOut(25, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while (!timeOut.hasTimedOut()) {
- try {
- ZkStateReader stateReader = replica1Jetty.getCoreContainer().getZkController().getZkStateReader();
- Slice shard = stateReader.getClusterState().getCollection(collectionName).getSlice("s1");
- if (shard.getReplicas().size() == 1) {
- replicaDeleted = true;
- waitingForReplicaGetDeleted.release();
- break;
- }
- Thread.sleep(250);
- } catch (NullPointerException | SolrException e) {
- log.error("", e);
- Thread.sleep(250);
- }
+ try {
+ ZkController.testing_beforeRegisterInZk = cd -> {
+ if (cd.getCloudDescriptor() == null) return false;
+ if (replica1.getName().equals(cd.getName()) && collectionName.equals(cd.getCloudDescriptor().getCollectionName())) {
+ if (times.incrementAndGet() > 1) {
+ return false;
}
- if (!replicaDeleted) {
- fail("Timeout for waiting replica get deleted");
+ log.info("Running delete core {}", cd);
+
+ try {
+
+ CollectionAdminRequest.DeleteReplica deleteReplica = CollectionAdminRequest.deleteReplica(collectionName, replica1.getSlice(), replica1.getName());
+ deleteReplica.setAsyncId("async1");
+ deleteReplica.process(cluster.getSolrClient(), collectionName);
+
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ if (collectionState.getReplica(replica1.getName()) != null) {
+ return false;
+ }
+ waitingForReplicaGetDeleted.release();
+ return true;
+ });
+
+ } catch (Exception e) {
+ log.error("", e);
+ fail("Failed to delete replica");
+ } finally {
+ //avoiding deadlock
+ waitingForReplicaGetDeleted.release();
}
- } catch (Exception e) {
- log.error("", e);
- fail("Failed to delete replica");
- } finally {
- //avoiding deadlock
- waitingForReplicaGetDeleted.release();
+ return true;
}
- return true;
- }
- return false;
- };
+ return false;
+ };
- try {
replica1Jetty.stop();
- waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState)
- -> collectionState.getSlice("s1").getReplica(replica1.getName()).getState() == DOWN);
+ waitForState("Expected replica:" + replica1 + " get down", collectionName, (liveNodes, collectionState) -> collectionState.getSlice("s1").getReplica(replica1.getName()).getState() == DOWN);
replica1Jetty.start();
waitingForReplicaGetDeleted.acquire();
} finally {
- ZkContainer.testing_beforeRegisterInZk = null;
+ ZkController.testing_beforeRegisterInZk = null;
}
TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Timeout adding replica to shard", () -> {
try {
- CollectionAdminRequest.addReplicaToShard(collectionName, "s1")
- .process(cluster.getSolrClient());
+ CollectionAdminRequest.addReplicaToShard(collectionName, "s1").process(cluster.getSolrClient());
return true;
} catch (Exception e) {
// expected, when the node is not fully started
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index 7740fa2..0aafd41 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -16,14 +16,12 @@
*/
package org.apache.solr.cloud;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestUtil;
-import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
@@ -129,7 +127,7 @@ public class DeleteShardTest extends SolrCloudTestCase {
@Test
// commented 4-Sep-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018
- public void testDirectoryCleanupAfterDeleteShard() throws InterruptedException, IOException, SolrServerException {
+ public void testDirectoryCleanupAfterDeleteShard() throws Exception {
final String collection = "deleteshard_test";
CollectionAdminRequest.createCollectionWithImplicitRouter(collection, "conf", "a,b,c", 1)
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
index 97a2f72..27087d0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
@@ -28,7 +28,6 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
public class DeleteStatusTest extends SolrCloudTestCase {
@@ -111,7 +110,6 @@ public class DeleteStatusTest extends SolrCloudTestCase {
}
@Test
- @Ignore // MRM TODO: - once I changed how requests from queue were deleted, this popped up as a race issue
public void testDeleteStatusFlush() throws Exception {
final CloudHttp2SolrClient client = cluster.getSolrClient();
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
index aae4227..6aa5905 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
@@ -101,7 +101,7 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
}
// let's put the leader in its own partition, no replicas can contact it now
- Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "s1");
if (log.isInfoEnabled()) {
log.info("Creating partition to leader at {}", leader.getCoreUrl());
}
@@ -109,12 +109,12 @@ public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
leaderProxy.close();
// let's find the leader of shard2 and ask him to commit
- Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
+ Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "s2");
sendCommitWithRetry(shard2Leader);
Thread.sleep(sleepMsBeforeHealPartition);
- leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "s1");
assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
if (log.isInfoEnabled()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionContextKeyTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionContextKeyTest.java
index 131193c..9e6b005 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionContextKeyTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionContextKeyTest.java
@@ -59,10 +59,10 @@ public class LeaderElectionContextKeyTest extends SolrCloudTestCase {
.setCreateNodeSet("")
.process(cluster.getSolrClient());
CollectionAdminRequest
- .addReplicaToShard("testCollection"+i, "shard1")
+ .addReplicaToShard("testCollection"+i, "s1")
.process(cluster.getSolrClient());
CollectionAdminRequest
- .addReplicaToShard("testCollection"+i, "shard2")
+ .addReplicaToShard("testCollection"+i, "s2")
.process(cluster.getSolrClient());
}
}
@@ -75,8 +75,8 @@ public class LeaderElectionContextKeyTest extends SolrCloudTestCase {
// The test assume that TEST_COLLECTION_1 and TEST_COLLECTION_2 will have identical layout
// ( same replica's name on every shard )
for (int i = 1; i <= 2; i++) {
- String coll1ShardiLeader = clusterState.getCollection(TEST_COLLECTION_1).getLeader("shard"+i).getName();
- String coll2ShardiLeader = clusterState.getCollection(TEST_COLLECTION_2).getLeader("shard"+i).getName();
+ String coll1ShardiLeader = clusterState.getCollection(TEST_COLLECTION_1).getLeader("s"+i).getName();
+ String coll2ShardiLeader = clusterState.getCollection(TEST_COLLECTION_2).getLeader("s"+i).getName();
String assertMss = String.format(Locale.ROOT, "Expect %s and %s each have a replica with same name on shard %s",
coll1ShardiLeader, coll2ShardiLeader, "shard"+i);
assertEquals(
@@ -92,8 +92,8 @@ public class LeaderElectionContextKeyTest extends SolrCloudTestCase {
try (SolrClient shardLeaderClient = new HttpSolrClient.Builder(replica.get("base_url").toString()).build()) {
assertEquals(1L, getElectionNodes(TEST_COLLECTION_1, shard, stateReader.getZkClient()).size());
- List<String> collection2Shard1Nodes = getElectionNodes(TEST_COLLECTION_2, "shard1", stateReader.getZkClient());
- List<String> collection2Shard2Nodes = getElectionNodes(TEST_COLLECTION_2, "shard2", stateReader.getZkClient());
+ List<String> collection2Shard1Nodes = getElectionNodes(TEST_COLLECTION_2, "s1", stateReader.getZkClient());
+ List<String> collection2Shard2Nodes = getElectionNodes(TEST_COLLECTION_2, "s2", stateReader.getZkClient());
CoreAdminRequest.unloadCore(replica.getName(), shardLeaderClient);
// Waiting for leader election being kicked off
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
@@ -108,8 +108,8 @@ public class LeaderElectionContextKeyTest extends SolrCloudTestCase {
}
assertTrue(found);
// There are no leader election was kicked off on testCollection2
- assertThat(collection2Shard1Nodes, CoreMatchers.is(getElectionNodes(TEST_COLLECTION_2, "shard1", stateReader.getZkClient())));
- assertThat(collection2Shard2Nodes, CoreMatchers.is(getElectionNodes(TEST_COLLECTION_2, "shard2", stateReader.getZkClient())));
+ assertThat(collection2Shard1Nodes, CoreMatchers.is(getElectionNodes(TEST_COLLECTION_2, "s1", stateReader.getZkClient())));
+ assertThat(collection2Shard2Nodes, CoreMatchers.is(getElectionNodes(TEST_COLLECTION_2, "s2", stateReader.getZkClient())));
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index 0b5d63c..099b3a7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -231,11 +231,11 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
// "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "");
// ZkController zkController = MockSolrSource.makeSimpleMock(null, null, zkClient);
// ElectionContext context = new ShardLeaderElectionContextBase(elector,
-// "shard2", "collection1", "dummynode1", props, zkController);
+// "s2", "collection1", "dummynode1", props, zkController);
// elector.setup(context);
// elector.joinElection(context, false);
// assertEquals("http://127.0.0.1/solr/",
-// getLeaderUrl("collection1", "shard2"));
+// getLeaderUrl("collection1", "s2"));
// }
// MRM TODO:
diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
index b68dcf0..610e9e6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
@@ -165,7 +165,7 @@ public class MigrateRouteKeyTest extends SolrCloudTestCase {
waitForState("Expected to find routing rule for split key " + splitKey, "sourceCollection", (n, c) -> {
if (c == null)
return false;
- Slice shard = c.getSlice("shard2");
+ Slice shard = c.getSlice("s2");
if (shard == null)
return false;
if (shard.getRoutingRules() == null || shard.getRoutingRules().isEmpty())
@@ -175,7 +175,7 @@ public class MigrateRouteKeyTest extends SolrCloudTestCase {
return true;
});
- boolean ruleRemoved = waitForRuleToExpire("sourceCollection", "shard2", splitKey, finishTime);
+ boolean ruleRemoved = waitForRuleToExpire("sourceCollection", "s2", splitKey, finishTime);
assertTrue("Routing rule was not expired", ruleRemoved);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
index 1cdac23..03ed7e5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -50,7 +50,6 @@ import org.apache.solr.common.util.TimeSource;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.util.TimeOut;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
index cd593d8..889b0b5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
@@ -25,33 +25,33 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
// See SOLR-6640
@SolrTestCaseJ4.SuppressSSL
@LuceneTestCase.Nightly
-@Ignore // MRM TODO: proxy not working right?
public class RecoveryAfterSoftCommitTest extends SolrCloudBridgeTestCase {
private static final int MAX_BUFFERED_DOCS = 2, ULOG_NUM_RECORDS_TO_KEEP = 2;
public RecoveryAfterSoftCommitTest() {
+
+ }
+
+ @BeforeClass
+ public static void beforeTests() {
sliceCount = 1;
numJettys = 2;
replicationFactor = 2;
enableProxy = true;
uploadSelectCollection1Config = true;
+ solrconfigString = "solrconfig.xml";
+ schemaString = "schema.xml";
System.setProperty("solr.tests.maxBufferedDocs", String.valueOf(MAX_BUFFERED_DOCS));
System.setProperty("solr.ulog.numRecordsToKeep", String.valueOf(ULOG_NUM_RECORDS_TO_KEEP));
// avoid creating too many files, see SOLR-7421
System.setProperty("useCompoundFile", "true");
}
- @BeforeClass
- public static void beforeTests() {
-
- }
-
@AfterClass
public static void afterTest() {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
index 684df7f..33f2316 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
@@ -177,11 +177,11 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
Set<Integer> byIDs;
byIDs = getSomeIds(2);
sendNonDirectDeletesRequestReplicaWithRetry(leader,
- byIDs, calcByIdRf(byIDs, testCollectionName, "shard2"),
+ byIDs, calcByIdRf(byIDs, testCollectionName, "s2"),
getSomeIds(2), 1, testCollectionName);
byIDs = getSomeIds(2);
sendNonDirectDeletesRequestReplicaWithRetry(replicas.get(0), byIDs,
- calcByIdRf(byIDs, testCollectionName, "shard2"),
+ calcByIdRf(byIDs, testCollectionName, "s2"),
getSomeIds(2), 1, testCollectionName);
// heal the partition
getProxyForReplica(shard2Replicas.get(0)).reopen();
diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
index f1db9fb..f96122b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud;
+import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
@@ -184,16 +185,46 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
}
if (uploadSelectCollection1Config) {
+ String path = null;
try {
- zkClient.create("/configs/_default/solrconfig.snippet.randomindexconfig.xml", TEST_PATH.resolve("collection1").resolve("conf").resolve("solrconfig.snippet.randomindexconfig.xml").toFile(),
- CreateMode.PERSISTENT, true);
- zkClient.create("/configs/_default/enumsConfig.xml", TEST_PATH.resolve("collection1").resolve("conf").resolve("enumsConfig.xml").toFile(), CreateMode.PERSISTENT, true);
- zkClient.create("/configs/_default/currency.xml", TEST_PATH.resolve("collection1").resolve("conf").resolve("currency.xml").toFile(), CreateMode.PERSISTENT, true);
- zkClient.create("/configs/_default/old_synonyms.txt", TEST_PATH.resolve("collection1").resolve("conf").resolve("old_synonyms.txt").toFile(), CreateMode.PERSISTENT, true);
- zkClient.create("/configs/_default/open-exchange-rates.json", TEST_PATH.resolve("collection1").resolve("conf").resolve("open-exchange-rates.json").toFile(), CreateMode.PERSISTENT, true);
- zkClient.create("/configs/_default/mapping-ISOLatin1Accent.txt", TEST_PATH.resolve("collection1").resolve("conf").resolve("mapping-ISOLatin1Accent.txt").toFile(), CreateMode.PERSISTENT, true);
+ path = "/configs/_default/solrconfig.snippet.randomindexconfig.xml";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException exists) {
- log.info("extra collection config files already exist in zk");
+ log.info("extra collection config file already exist in zk {}", path);
+ }
+
+ try {
+ path = "/configs/_default/enumsConfig.xml";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException exists) {
+ log.info("extra collection config file already exist in zk {}", path);
+ }
+
+ try {
+ path = "/configs/_default/currency.xml";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException exists) {
+ log.info("extra collection config file already exist in zk {}", path);
+ }
+
+ try {
+ path = "/configs/_default/old_synonyms.txt";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException exists) {
+ log.info("extra collection config file already exist in zk {}", path);
+ }
+
+ try {
+ path = "/configs/_default/open-exchange-rates.json";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException exists) {
+ log.info("extra collection config file already exist in zk {}", path);
+ }
+ try {
+ path = "/configs/_default/mapping-ISOLatin1Accent.txt";
+ zkClient.create(path, TEST_PATH.resolve("collection1").resolve("conf").resolve(new File(path).getName()).toFile(), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException exists) {
+ log.info("extra collection config file already exist in zk {}", path);
}
}
@@ -428,24 +459,8 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
return Integer.parseInt(tmp);
}
- protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutSecs) throws Exception {
- Replica leader = null;
- long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
- while (System.nanoTime() < timeout) {
- Replica tmp = null;
- try {
- tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
- } catch (Exception exc) {}
- if (tmp != null && State.ACTIVE == tmp.getState()) {
- leader = tmp;
- break;
- }
- Thread.sleep(250);
- }
- assertNotNull("Could not find active leader for " + shardId + " of " +
- testCollectionName + " after "+timeoutSecs+" secs;", leader);
-
- return leader;
+ protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutms) throws Exception {
+ return cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId, timeoutms, true);
}
protected JettySolrRunner getJettyOnPort(int port) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index a5fb602..abd1569 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -84,7 +84,7 @@ public class SplitShardTest extends SolrCloudTestCase {
}
@Test
- public void doTest() throws IOException, SolrServerException {
+ public void doTest() throws Exception {
CollectionAdminRequest
.createCollection(COLLECTION_NAME, "conf", 2, 1)
.setMaxShardsPerNode(100)
diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
index 2a1d35a..4550ade 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
@@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
@@ -37,6 +38,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
@@ -73,7 +75,7 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
handle.clear();
handle.put("timestamp", SKIPVAL);
- // waitForThingsToLevelOut(30, TimeUnit.SECONDS);
+ cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, false, 1, numJettys, true, true);
List<JettySolrRunner> skipServers = new ArrayList<>();
int docId = 0;
@@ -126,6 +128,7 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
// kill the leader - new leader could have all the docs or be missing one
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 10000)));
+ log.info("Stopping leader jetty {}", leaderJetty.getBaseUrl());
skipServers = getRandomOtherJetty(leaderJetty, null); // but not the leader
@@ -144,7 +147,11 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
int cnt = 0;
while (deadJetty == leaderJetty) {
// updateMappingsFromZk(this.jettys, this.clients);
- leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 5)));
+ try {
+ leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 1000)));
+ } catch (SolrException e) {
+ log.info("did not get leader", e);
+ }
if (deadJetty == leaderJetty) {
Thread.sleep(500);
}
@@ -157,7 +164,7 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
deadJetty.start(); // he is not the leader anymore
log.info("numJettys=" + numJettys);
- cluster.waitForActiveCollection(COLLECTION, 1, numJettys);
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, numJettys, true, true);
skipServers = getRandomOtherJetty(leaderJetty, deadJetty);
skipServers.addAll(getRandomOtherJetty(leaderJetty, deadJetty));
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index 2e6678e..105642c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -25,6 +25,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.SolrTestUtil;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -46,12 +47,13 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
useFactory(null);
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
- configureCluster(2)
+ configureCluster(3)
.addConfig("config", SolrTestUtil.TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
+ // 2 replicas will not ensure we don't lose an update here, so 3
CollectionAdminRequest
- .createCollection(COLLECTION, "config", 1,2)
+ .createCollection(COLLECTION, "config", 1,3)
.setMaxShardsPerNode(100)
.waitForFinalState(true)
.process(cluster.getSolrClient());
@@ -66,26 +68,40 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
public void test() throws Exception {
JettySolrRunner node1 = cluster.getJettySolrRunner(0);
JettySolrRunner node2 = cluster.getJettySolrRunner(1);
- try (Http2SolrClient client1 = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
+
+ try (Http2SolrClient client1 = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl())) {
node2.stop();
- cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
- cluster.waitForActiveCollection(COLLECTION, 1, 1, true);
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
+ Thread.sleep(250);
+ cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, false, 1, 2, true, true);
UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 100; i++) {
req = req.add("id", i+"", "num", i+"");
}
- req.commit(client1, COLLECTION);
- node2.start();
+ try {
+ req.commit(client1, COLLECTION);
+ } catch (BaseHttpSolrClient.RemoteSolrException e) {
+ Thread.sleep(250);
+ try {
+ req.commit(client1, COLLECTION);
+ } catch (BaseHttpSolrClient.RemoteSolrException e2) {
+ Thread.sleep(500);
+ req.commit(client1, COLLECTION);
+ }
+ }
- cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
+ node2.start();
- cluster.waitForActiveCollection(COLLECTION, 1, 2, true);
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 3);
+ Thread.sleep(250);
+ log.info("wait for active collection before query");
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, 3, true, true);
- try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
long numFound = client.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound();
assertEquals(100, numFound);
}
@@ -95,24 +111,50 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
new UpdateRequest().add("id", "1", "num", "10")
.commit(client1, COLLECTION);
- Object v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "true")).getResults().get(0).get("num");
- assertEquals("10", v.toString());
-
+ // can be stale (eventually consistent) but should catch up
+ for (int i = 0; i < 30; i++) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
+ Object v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "true")).getResults().get(0).get("num");
+ try {
+ assertEquals("10 i="+ i, "10", v.toString());
+ break;
+ } catch (AssertionError error) {
+ if (i == 29) {
+ throw error;
+ }
+ Thread.sleep(100);
+ }
+ }
+ }
- v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
+ Object v = client1.query(COLLECTION, new SolrQuery("q", "id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("10", v.toString());
- try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
- v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "true")).getResults().get(0).get("num");
- assertEquals("10", v.toString());
+ // can be stale (eventually consistent) but should catch up
+ for (int i = 0; i < 30; i ++) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
+ v = client.query(COLLECTION, new SolrQuery("q", "id:1", "distrib", "true")).getResults().get(0).get("num");
+ try {
+ assertEquals("node requested=" + node2.getBaseUrl() + " 10 i="+ i, "10", v.toString());
+ break;
+ } catch (AssertionError error) {
+ if (i == 29) {
+ throw error;
+ }
+ Thread.sleep(100);
+ }
+ }
}
//
node2.stop();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
Thread.sleep(250);
+ log.info("wait for active collection before query");
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, 2, true, true);
new UpdateRequest().add("id", "1", "num", "20")
.commit(client1, COLLECTION);
@@ -122,9 +164,10 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
node2.start();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 3);
Thread.sleep(250);
-
- cluster.waitForActiveCollection(COLLECTION, 1, 2);
+ log.info("wait for active collection before query");
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, 3, true, true);
try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
@@ -134,32 +177,56 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
node2.stop();
- Thread.sleep(250);
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
+
+ log.info("wait for active collection before query");
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, 2, true, true);
new UpdateRequest().add("id", "1", "num", "30")
.commit(client1, COLLECTION);
- v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
- SolrTestCaseJ4.assertEquals("30", v.toString());
+
+
+ for (int i = 0; i < 30; i ++) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
+ v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
+ try {
+ SolrTestCaseJ4.assertEquals("30", v.toString());
+ break;
+ } catch (AssertionError error) {
+ if (i == 29) {
+ throw error;
+ }
+ Thread.sleep(100);
+ }
+ }
+ }
+
node2.start();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 3);
+
Thread.sleep(250);
- cluster.waitForActiveCollection(COLLECTION, 1, 2);
+ cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, false, 1, 3, true, true);
- try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
- Replica oldLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION,"s1");
-
+ Replica oldLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION,"s1", 5000, true);
node1.stop();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
+ Thread.sleep(250);
+ cluster.waitForActiveCollection(COLLECTION, 5, TimeUnit.SECONDS, false, 1, 2, true, true);
+
+ Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION,"s1", 5000, true);
if (oldLeader.getNodeName().equals(node1.getNodeName())) {
waitForState("", COLLECTION, (liveNodes, collectionState) -> {
@@ -170,15 +237,17 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
node1.start();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 3);
Thread.sleep(250);
+ cluster.getSolrClient().getZkStateReader().getZkClient().printLayout();
- cluster.waitForActiveCollection(COLLECTION, 1, 2);
+ cluster.waitForActiveCollection(COLLECTION, 10, TimeUnit.SECONDS, false, 1, 3, true, true);
- try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
- try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
+ try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl())) {
Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java b/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java
index ef2e6c0..2087f54 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java
@@ -41,8 +41,8 @@ public class TestDistribDocBasedVersion extends SolrCloudBridgeTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- String bucket1 = "shard1"; // shard1: top bits:10 80000000:ffffffff
- String bucket2 = "shard2"; // shard2: top bits:00 00000000:7fffffff
+ String bucket1 = "s1"; // shard1: top bits:10 80000000:ffffffff
+ String bucket2 = "s2"; // shard2: top bits:00 00000000:7fffffff
private static String vfield = "my_version_l";
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestDownShardTolerantSearch.java b/solr/core/src/test/org/apache/solr/cloud/TestDownShardTolerantSearch.java
index a699fd4..240eec1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestDownShardTolerantSearch.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestDownShardTolerantSearch.java
@@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ShardParams;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import static org.hamcrest.CoreMatchers.is;
import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
/**
* Test which asserts that shards.tolerant=true works even if one shard is down
@@ -43,15 +45,22 @@ public class TestDownShardTolerantSearch extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
- public static void setupCluster() throws Exception {
+ public static void beforeTestDownShardTolerantSearch() throws Exception {
configureCluster(2).addConfig("conf", SolrTestUtil.configset("cloud-minimal")).configure();
+ CollectionAdminRequest.createCollection("tolerant", "conf", 2, 1).waitForFinalState(true).process(cluster.getSolrClient());
+ cluster.getSolrClient().getZkStateReader().waitForActiveCollection("tolerant", 5, TimeUnit.SECONDS, false, 2, 2, true, true);
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
+ }
+
+ @AfterClass
+ public static void afterTestDownShardTolerantSearch() throws Exception {
+ cluster.deleteAllCollections();
+ shutdownCluster();
}
@Test
public void searchingShouldFailWithoutTolerantSearchSetToTrue() throws Exception {
- CollectionAdminRequest.createCollection("tolerant", "conf", 2, 1).waitForFinalState(true).process(cluster.getSolrClient());
-
UpdateRequest update = new UpdateRequest();
for (int i = 0; i < 100; i++) {
update.add("id", Integer.toString(i));
@@ -66,17 +75,19 @@ public class TestDownShardTolerantSearch extends SolrCloudTestCase {
cluster.waitForJettyToStop(stoppedServer);
- try (SolrClient client = cluster.buildSolrClient()) {
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
- response = client.query("tolerant", new SolrQuery("*:*").setRows(1).setParam(ShardParams.SHARDS_TOLERANT, true));
+ SolrClient client = cluster.getSolrClient();
- assertThat(response.getStatus(), is(0));
- assertTrue(response.getResults().getNumFound() > 0);
+ response = client.query("tolerant", new SolrQuery("*:*").setRows(1).setParam(ShardParams.SHARDS_TOLERANT, true));
- Exception e = SolrTestCaseUtil.expectThrows(Exception.class, "Request should have failed because we killed shard1 jetty",
- () -> cluster.getSolrClient().query("tolerant", new SolrQuery("*:*").setRows(1).setParam(ShardParams.SHARDS_TOLERANT, false)));
+ assertThat(response.getStatus(), is(0));
+ assertTrue(response.getResults().getNumFound() > 0);
+
+ Exception e = SolrTestCaseUtil.expectThrows(Exception.class, "Request should have failed because we killed shard1 jetty",
+ () -> cluster.getSolrClient().query("tolerant", new SolrQuery("*:*").setRows(1).setParam(ShardParams.SHARDS_TOLERANT, false)));
+
+ assertNotNull(e);
- assertNotNull(e);
- }
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java b/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java
index 5ddeb39..278adb9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestHashPartitioner.java
@@ -253,7 +253,7 @@ public class TestHashPartitioner extends SolrTestCaseJ4 {
// shard3: 00
// shard4: 01
- String[] highBitsToShard = {"shard3","shard4","shard1","shard2"};
+ String[] highBitsToShard = {"shard3","shard4","s1","s2"};
for (int i = 0; i<26; i++) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index deed435..6906dfd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -408,6 +408,7 @@ public class TestPullReplica extends SolrCloudTestCase {
} else {
leaderJetty = cluster.getReplicaJetty(s.getLeader());
leaderJetty.stop();
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
waitForState("Leader replica not removed", collectionName, clusterShape(1, 1));
// Wait for cluster state to be updated
waitForState("Replica state not updated in cluster state",
@@ -454,8 +455,11 @@ public class TestPullReplica extends SolrCloudTestCase {
CollectionAdminRequest.addReplicaToShard(collectionName, "s1", Replica.Type.NRT).waitForFinalState(true).process(cluster.getSolrClient());
} else {
leaderJetty.start();
+
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
}
+
SolrTestCaseJ4.unIgnoreException("No registered leader was found"); // Should have a leader from now on
// Validate that the new nrt replica is the leader now
@@ -478,7 +482,11 @@ public class TestPullReplica extends SolrCloudTestCase {
// add docs agin
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"));
s = docCollection.getSlices().iterator().next();
- try (Http2SolrClient leaderClient = SolrTestCaseJ4.getHttpSolrClient(s.getLeader().getCoreUrl())) {
+
+
+ leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, s.getName());
+
+ try (Http2SolrClient leaderClient = SolrTestCaseJ4.getHttpSolrClient(leader.getCoreUrl())) {
leaderClient.commit();
SolrDocumentList results = leaderClient.query(new SolrQuery("*:*")).getResults();
assertEquals(results.toString(), 2, results.getNumFound());
@@ -500,15 +508,29 @@ public class TestPullReplica extends SolrCloudTestCase {
JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("s1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
pullReplicaJetty.stop();
- waitForState("Replica not removed", collectionName, activeReplicaCount(1, 0, 0));
- // Also wait for the replica to be placed in state="down"
- waitForState("Didn't not live state", collectionName, notLive(Replica.Type.PULL));
+
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
+
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ if (!activeReplicaCount(1, 0, 0).matches(liveNodes, collectionState)) {
+ return false;
+ }
+ if (!notLive(Replica.Type.PULL).matches(liveNodes, collectionState)) {
+ return false;
+ }
+ return true;
+ });
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
cluster.getSolrClient().commit(collectionName);
waitForNumDocsInAllActiveReplicas(2);
pullReplicaJetty.start();
+
+ cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 2);
waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
waitForNumDocsInAllActiveReplicas(2);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistClusterPerZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistClusterPerZkTest.java
index 8a0873a..4dd1724 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistClusterPerZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistClusterPerZkTest.java
@@ -75,6 +75,8 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Tests the Cloud Collections API.
@@ -442,7 +444,26 @@ public class CollectionsAPIDistClusterPerZkTest extends SolrCloudTestCase {
req = CollectionAdminRequest.addReplicaToShard(collectionName, "s1").withProperty(CoreAdminParams.INSTANCE_DIR, instancePath.toString());
req.setWaitForFinalState(true);
response = req.process(cluster.getSolrClient());
- newReplica = grabNewReplica(response, getCollectionState(collectionName));
+ String replicaName = response.getCollectionCoresStatus().keySet().iterator().next();
+ AtomicReference<Replica> theReplica = new AtomicReference<>();
+ try {
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ Replica replica = collectionState.getReplica(replicaName);
+ if (replica != null) {
+ theReplica.set(replica);
+ return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException e) {
+ log.error("timeout",e);
+ throw new TimeoutException("timeout waiting to see " + replicaName);
+ }
+
+ newReplica = theReplica.get();
assertNotNull(newReplica);
try (Http2SolrClient coreclient = SolrTestCaseJ4.getHttpSolrClient(newReplica.getBaseUrl())) {
@@ -459,8 +480,22 @@ public class CollectionsAPIDistClusterPerZkTest extends SolrCloudTestCase {
req.setWaitForFinalState(true);
response = req.process(cluster.getSolrClient());
- newReplica = grabNewReplica(response, getCollectionState(collectionName));
- // MRM TODO: do we really want to support this anymore?
+ AtomicReference<Replica> theReplica2 = new AtomicReference<>();
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ Replica replica = collectionState.getReplica(replicaName);
+ if (replica != null) {
+ theReplica2.set(replica);
+ return true;
+ }
+ return false;
+ });
+
+ newReplica = theReplica2.get();
+ assertNotNull(theReplica2);
+ // MRM TODO: do we really want to support this anymore? We really should control core names for cloud
// assertEquals("'core' should be 'propertyDotName' " + newReplica.getName(), "propertyDotName", newReplica.getName());
}
diff --git a/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java b/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java
index f9b5967..b2e9810 100644
--- a/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java
+++ b/solr/core/src/test/org/apache/solr/core/CoreSorterTest.java
@@ -24,8 +24,8 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreSorter.CountsForEachShard;
-import org.junit.Ignore;
import org.junit.Test;
import static org.mockito.Mockito.mock;
@@ -34,13 +34,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-@Ignore // MRM TODO: this mock test needs updating after dropping the separate solrdispatchfilter zkclient
public class CoreSorterTest extends SolrTestCaseJ4 {
private static final List<CountsForEachShard> inputCounts = Arrays.asList(
@@ -104,6 +104,7 @@ public class CoreSorterTest extends SolrTestCaseJ4 {
Map<String,DocCollection> collToState = new HashMap<>();
Map<CountsForEachShard, List<CoreDescriptor>> myCountsToDescs = new HashMap<>();
+ long id = 0;
for (Map.Entry<String, List<CountsForEachShard>> entry : collToCounts.entrySet()) {
String collection = entry.getKey();
List<CountsForEachShard> collCounts = entry.getValue();
@@ -125,7 +126,7 @@ public class CoreSorterTest extends SolrTestCaseJ4 {
Map<String, Replica> replicaMap = replicas.stream().collect(Collectors.toMap(Replica::getName, Function.identity()));
sliceMap.put(slice, new Slice(slice, replicaMap, map(), collection, -1l, nodeName -> "http://" + nodeName));
}
- DocCollection col = new DocCollection(collection, sliceMap, map(), DocRouter.DEFAULT);
+ DocCollection col = new DocCollection(collection, sliceMap, map("id", id++), DocRouter.DEFAULT);
collToState.put(collection, col);
}
// reverse map
@@ -143,6 +144,8 @@ public class CoreSorterTest extends SolrTestCaseJ4 {
{
when(mockCC.isZooKeeperAware()).thenReturn(true);
+ ZkStateReader mockZkReader= mock(ZkStateReader.class);
+ when(mockZkReader.getLiveNodes()).thenReturn(new HashSet<>(liveNodes));
ZkController mockZKC = mock(ZkController.class);
when(mockCC.getZkController()).thenReturn(mockZKC);
{
@@ -154,6 +157,8 @@ public class CoreSorterTest extends SolrTestCaseJ4 {
}
}
}
+ when(mockZKC.getCoreContainer()).thenReturn(mockCC);
+ when(mockZKC.getZkStateReader()).thenReturn(mockZkReader);
NodeConfig mockNodeConfig = mock(NodeConfig.class);
when(mockNodeConfig.getNodeName()).thenReturn(thisNode);
diff --git a/solr/core/src/test/org/apache/solr/core/ExitableDirectoryReaderTest.java b/solr/core/src/test/org/apache/solr/core/ExitableDirectoryReaderTest.java
index 6a32a86..d916ce2 100644
--- a/solr/core/src/test/org/apache/solr/core/ExitableDirectoryReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/core/ExitableDirectoryReaderTest.java
@@ -25,7 +25,6 @@ import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import static org.apache.solr.common.util.Utils.fromJSONString;
@@ -96,7 +95,6 @@ public class ExitableDirectoryReaderTest extends SolrTestCaseJ4 {
// removed once it is running and this test should be un-ignored and the assumptions verified.
// With all the weirdness, I'm not going to vouch for this test. Feel free to change it.
@Test
- @Ignore // MRM TODO: - maybe needs a force update
public void testCacheAssumptions() throws Exception {
String fq= "name:d*";
SolrCore core = h.getCore();
@@ -139,7 +137,6 @@ public class ExitableDirectoryReaderTest extends SolrTestCaseJ4 {
// When looking at a problem raised on the user's list I ran across this anomaly with timeAllowed
// This tests for the second query NOT returning partial results, along with some other
@Test
- @Ignore // MRM TODO: - maybe needs a force update
public void testQueryResults() throws Exception {
String q = "name:e*";
SolrCore core = h.getCore();
diff --git a/solr/core/src/test/org/apache/solr/core/SolrCoreCheckLockOnStartupTest.java b/solr/core/src/test/org/apache/solr/core/SolrCoreCheckLockOnStartupTest.java
index 1dd41b7..60c3ca2 100644
--- a/solr/core/src/test/org/apache/solr/core/SolrCoreCheckLockOnStartupTest.java
+++ b/solr/core/src/test/org/apache/solr/core/SolrCoreCheckLockOnStartupTest.java
@@ -25,7 +25,6 @@ import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +34,6 @@ import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.util.Map;
-@Ignore // MRM TODO:
public class SolrCoreCheckLockOnStartupTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
index e3d168b..8f57362 100644
--- a/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
+++ b/solr/core/src/test/org/apache/solr/core/TestBadConfig.java
@@ -47,8 +47,6 @@ public class TestBadConfig extends AbstractBadConfigTestBase {
"useCompoundFile");
}
- @Ignore // this fails because a small change - currently, a SolrCore failing in CoreContainer#load will
- // not fail with an exception, though the exception will be logged - we should check the core init exceptions here
public void testUpdateLogButNoVersionField() throws Exception {
System.setProperty("enable.update.log", "true");
diff --git a/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java b/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
index 8167d6f..0dfeccd 100644
--- a/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
+++ b/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
@@ -18,7 +18,6 @@ package org.apache.solr.core;
import java.io.IOException;
import java.util.Map;
-import java.util.NoSuchElementException;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
@@ -38,7 +37,6 @@ import org.apache.solr.schema.SchemaField;
import org.apache.solr.util.TestHarness;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import javax.xml.xpath.XPathExpressionException;
@@ -164,29 +162,36 @@ public class TestCodecSupport extends SolrTestCaseJ4 {
doTestCompressionMode("best_compression", "BEST_COMPRESSION");
}
- @Ignore // MRM TODO: - this test can be flakey after the explicit reload below - some race ...?
public void testMixedCompressionMode() throws Exception {
System.setProperty("tests.COMPRESSION_MODE", "BEST_SPEED");
h.getCoreContainer().reload(h.coreName);
assertU(add(doc("string_f", "1", "text", "foo bar")));
assertU(commit());
- assertCompressionMode("BEST_SPEED", h.getCore());
+ try (SolrCore core = h.getCore()) {
+ assertCompressionMode("BEST_SPEED", core);
+ }
System.setProperty("tests.COMPRESSION_MODE", "BEST_COMPRESSION");
h.getCoreContainer().reload(h.coreName);
assertU(add(doc("string_f", "2", "text", "foo zar")));
assertU(commit());
- assertCompressionMode("BEST_COMPRESSION", h.getCore());
+ try (SolrCore core = h.getCore()) {
+ assertCompressionMode("BEST_COMPRESSION", core);
+ }
System.setProperty("tests.COMPRESSION_MODE", "BEST_SPEED");
h.getCoreContainer().reload(h.coreName);
assertU(add(doc("string_f", "3", "text", "foo zoo")));
assertU(commit());
- assertCompressionMode("BEST_SPEED", h.getCore());
+ try (SolrCore core = h.getCore()) {
+ assertCompressionMode("BEST_SPEED", core);
+ }
assertQ(req("q", "*:*"),
"//*[@numFound='3']");
assertQ(req("q", "text:foo"),
"//*[@numFound='3']");
assertU(optimize("maxSegments", "1"));
- assertCompressionMode("BEST_SPEED", h.getCore());
+ try (SolrCore core = h.getCore()) {
+ assertCompressionMode("BEST_SPEED", core);
+ }
System.clearProperty("tests.COMPRESSION_MODE");
}
diff --git a/solr/core/src/test/org/apache/solr/core/TestCustomStream.java b/solr/core/src/test/org/apache/solr/core/TestCustomStream.java
index f931a91..4cffeba 100644
--- a/solr/core/src/test/org/apache/solr/core/TestCustomStream.java
+++ b/solr/core/src/test/org/apache/solr/core/TestCustomStream.java
@@ -28,7 +28,6 @@ import org.junit.Test;
/**
* Created by caomanhdat on 6/3/16.
*/
-@Ignore // MRM TODO: debug
public class TestCustomStream extends AbstractFullDistribZkTestBase {
@Test
@@ -47,12 +46,13 @@ public class TestCustomStream extends AbstractFullDistribZkTestBase {
Arrays.asList("overlay", "expressible", "hello", "class"),
"org.apache.solr.core.HelloStream",10);
- TestSolrConfigHandler.testForResponseElement(client,
- null,
- "/stream?expr=hello()",
- null,
- Arrays.asList("result-set", "docs[0]", "msg"),
- "Hello World!",10);
+// MRM TODO:
+// TestSolrConfigHandler.testForResponseElement(client,
+// null,
+// "/stream?expr=hello()",
+// null,
+// Arrays.asList("result-set", "docs[0]", "msg"),
+// "Hello World!",10);
}
diff --git a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
index cee5f62..2cddbc9 100644
--- a/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
+++ b/solr/core/src/test/org/apache/solr/core/TestJmxIntegration.java
@@ -183,7 +183,7 @@ public class TestJmxIntegration extends SolrTestCaseJ4 {
ObjectName name = nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
- timeout = new TimeOut(1000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ timeout = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
Integer oldNumDocs = null;
while (!timeout.hasTimedOut()) {
nameFactory.createName("gauge", registryName, "SEARCHER.searcher.numDocs");
diff --git a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
index f71d0a5..e5d6cd2 100644
--- a/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
+++ b/solr/core/src/test/org/apache/solr/core/TestSolrConfigHandler.java
@@ -609,6 +609,7 @@ public class TestSolrConfigHandler extends RestTestBase {
while (TimeUnit.SECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) < maxTimeoutSeconds) {
try {
m = testServerBaseUrl == null ? getRespMap(uri, harness) : TestSolrConfigHandlerConcurrent.getAsMap(testServerBaseUrl + uri, cloudSolrClient);
+ log.info("response is {}", m);
} catch (Exception e) {
continue;
diff --git a/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
index db1fe66..63c2522 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/ResourceSharingTestComponent.java
@@ -36,6 +36,7 @@ import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
@@ -77,8 +78,12 @@ public class ResourceSharingTestComponent extends SearchComponent implements Sol
log.info("Informing test component...");
this.core = core;
ParWork.getRootSharedExecutor().submit(() -> {
- core.getCoreContainer().getZkController().getZkStateReader().waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 5, TimeUnit.SECONDS, 1, 1, false);
- this.blob = core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
+ try {
+ core.getCoreContainer().getZkController().getZkStateReader().waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 5, TimeUnit.SECONDS, 1, 1, false);
+ } catch (TimeoutException e) {
+ log.error("timeout", e);
+ }
+ this.blob = core.loadDecodeAndCacheBlob(getKey(), new DumbCsvDecoder()).blob;
});
log.info("Test component informed!");
diff --git a/solr/core/src/test/org/apache/solr/handler/component/ShardsWhitelistTest.java b/solr/core/src/test/org/apache/solr/handler/component/ShardsWhitelistTest.java
index e210011..557b646 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/ShardsWhitelistTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/ShardsWhitelistTest.java
@@ -29,6 +29,8 @@ import org.apache.solr.common.SolrInputDocument;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
@@ -36,11 +38,15 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeoutException;
public class ShardsWhitelistTest extends MultiSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* The cluster with this key will include an explicit list of host whitelisted (all hosts in both the clusters)
*/
@@ -148,7 +154,11 @@ public class ShardsWhitelistTest extends MultiSolrCloudTestCase {
clusterId2cluster.forEach((s, miniSolrCloudCluster) -> {
- miniSolrCloudCluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards);
+ try {
+ miniSolrCloudCluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards);
+ } catch (TimeoutException e) {
+ log.error("timeout", e);
+ }
List<SolrInputDocument> docs = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
diff --git a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
index b2c133b..32c92fa 100644
--- a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
@@ -151,6 +151,15 @@ public class SolrMetricsIntegrationTest extends SolrTestCaseJ4 {
// MRM TODO: - those timers are disabled right now
// assertEquals("metric counter incorrect", iterations, finalCount - initialCount);
Map<String,SolrMetricReporter> reporters = metricManager.getReporters(coreMetricManager.getRegistryName());
+
+ for (int i = 0; i < 5; i++) {
+ if ((RENAMED_REPORTERS.length + jmxReporter) != reporters.size()) {
+ Thread.sleep(100);
+ } else {
+ break;
+ }
+ }
+
assertEquals(RENAMED_REPORTERS.length + jmxReporter, reporters.size());
// SPECIFIC and MULTIREGISTRY were skipped because they were
diff --git a/solr/core/src/test/org/apache/solr/security/JWTAuthPluginIntegrationTest.java b/solr/core/src/test/org/apache/solr/security/JWTAuthPluginIntegrationTest.java
index 69ee752..bda884a 100644
--- a/solr/core/src/test/org/apache/solr/security/JWTAuthPluginIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/security/JWTAuthPluginIntegrationTest.java
@@ -299,7 +299,7 @@ public class JWTAuthPluginIntegrationTest extends SolrCloudAuthTestCase {
return new Pair<>(result, code);
}
- private void createCollection(String collectionName) throws IOException {
+ private void createCollection(String collectionName) throws Exception {
assertEquals(200, get(baseUrl + "/admin/collections?action=CREATE&name=" + collectionName + "&numShards=2", jwtTestToken).second().intValue());
cluster.waitForActiveCollection(collectionName, 2, 2);
}
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
index 86d3415..6448df4 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdateWithRouteField.java
@@ -55,7 +55,7 @@ public class TestInPlaceUpdateWithRouteField extends SolrCloudTestCase {
private static final int NUMBER_OF_DOCS = 100;
private static final String COLLECTION = "collection1";
- private static final String[] shards = new String[]{"shard1","shard2","shard3"};
+ private static final String[] shards = new String[]{"s1","s2","s3"};
@BeforeClass
public static void setupCluster() throws Exception {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 1020d22..8aa2055 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -159,8 +159,8 @@ public class Http2SolrClient extends SolrClient {
*/
private volatile String serverBaseUrl;
private volatile boolean closeClient;
- private SolrQueuedThreadPool httpClientExecutor;
- private SolrScheduledExecutorScheduler scheduler;
+ private volatile SolrQueuedThreadPool httpClientExecutor;
+ private volatile SolrScheduledExecutorScheduler scheduler;
private volatile boolean closed;
protected Http2SolrClient(String serverBaseUrl, Builder builder) {
@@ -516,7 +516,7 @@ public class Http2SolrClient extends SolrClient {
req.afterSend.run();
}
} catch (Exception e) {
-
+ log.debug("failed sending request", e);
if (e != CANCELLED_EXCEPTION) {
asyncListener.onFailure(e, 500);
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index e4ce6e5..44c85c6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -153,9 +153,6 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider, Repli
@Override
public void connect() {
- if (isClosed) {
- throw new AlreadyClosedException();
- }
if (this.zkStateReader == null) {
synchronized (this) {
if (this.zkStateReader == null) {
@@ -167,9 +164,6 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider, Repli
}
public ZkStateReader getZkStateReader() {
- if (isClosed) {
- throw new AlreadyClosedException();
- }
if (zkStateReader == null) {
synchronized (this) {
@@ -183,10 +177,6 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider, Repli
@Override
public void close() throws IOException {
- if (isClosed) {
- return;
- }
-
final ZkStateReader zkToClose = zkStateReader;
if (zkToClose != null && closeZkStateReader) {
IOUtils.closeQuietly(zkToClose);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index d7d41d4..2b1dbaf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -225,7 +225,6 @@ public class ClusterState implements JSONWriter.Writable {
return null;
}
DocCollection docCollection = cs.getCollectionsMap().values().iterator().next();
- docCollection.setZnodeVersion(version);
return docCollection;
}
@@ -237,11 +236,11 @@ public class ClusterState implements JSONWriter.Writable {
collections.put(collectionName, new CollectionRef(coll));
}
- return new ClusterState(collections, version);
+ return new ClusterState(collections, -1);
}
// TODO move to static DocCollection.loadFromMap
- private static DocCollection collectionFromObjects(Replica.NodeNameToBaseUrl zkStateReader, String name, Map<String, Object> objs, int version) {
+ private static DocCollection collectionFromObjects(Replica.NodeNameToBaseUrl zkStateReader, String name, Map<String, Object> objs, Integer version) {
Map<String,Object> props;
Map<String,Slice> slices;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 13ad78c..256413e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
@@ -51,7 +52,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
- private int znodeVersion;
+ private volatile int znodeVersion;
private final String name;
private final Map<String, Slice> slices;
@@ -63,13 +64,13 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final Integer numPullReplicas;
private final Integer maxShardsPerNode;
private final Boolean readOnly;
- private volatile Map stateUpdates;
+ private volatile ConcurrentHashMap stateUpdates;
private final Long id;
private AtomicInteger sliceAssignCnt = new AtomicInteger();
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
- this(name, slices, props, router, -1, null);
+ this(name, slices, props, router, 0, new ConcurrentHashMap());
}
/**
@@ -78,11 +79,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
* @param props The properties of the slice. This is used directly and a copy is not made.
* @param zkVersion The version of the Collection node in Zookeeper (used for conditional updates).
*/
- public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, Map stateUpdates) {
+ public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, ConcurrentHashMap stateUpdates) {
super(props==null ? props = new HashMap<>() : props);
+
this.znodeVersion = zkVersion;
this.name = name;
- this.stateUpdates = stateUpdates;
+ if (stateUpdates == null) {
+ this.stateUpdates = new ConcurrentHashMap();
+ } else {
+ this.stateUpdates = stateUpdates;
+ }
this.slices = slices;
this.replicationFactor = (Integer) verifyProp(props, REPLICATION_FACTOR);
this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
@@ -217,11 +223,12 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
* Get the list of all leaders hosted on the given node or <code>null</code> if none.
*/
public List<Replica> getLeaderReplicas(String nodeName) {
- Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
+ List<String> shuffleSlices = new ArrayList<>(slices.keySet());
+ Collections.shuffle(shuffleSlices);
List<Replica> leaders = new ArrayList<>(slices.size());
- while (iter.hasNext()) {
- Map.Entry<String, Slice> slice = iter.next();
- Replica leader = slice.getValue().getLeader();
+ for (String s : shuffleSlices) {
+ Slice slice = slices.get(s);
+ Replica leader = slice.getLeader();
if (leader != null && leader.getNodeName().equals(nodeName)) {
leaders.add(leader);
}
@@ -446,9 +453,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return stateUpdates != null;
}
- public void setStateUpdates(Map stateUpdates) {
- this.stateUpdates = stateUpdates;
- }
+// public void setStateUpdates(ConcurrentHashMap stateUpdates) {
+// this.stateUpdates = stateUpdates;
+// }
public void setSliceAssignCnt(int i) {
sliceAssignCnt.set(i);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 3f58db4..282f670 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -235,10 +235,13 @@ public class Replica extends ZkNodeProps {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
-
Replica replica = (Replica) o;
+ return name.equals(replica.name) && nodeName.equals(replica.nodeName);
+ }
- return name.equals(replica.name);
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, nodeName);
}
/** Also known as coreNodeName. */
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 1aaa259..64d8ede 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -103,7 +103,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
Object removed = replica.getProperties().remove("numShards");
}
- for (Replica replica : replicas.values()) {
+ for (Replica replica : currentSlice.getReplicas()) {
if (!replicas.containsKey(replica.getName())) {
replicas.put(replica.getName(), replica);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 454e30c..50e364d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -1050,7 +1050,8 @@ public class SolrZkClient implements Closeable {
}
if ((stat != null && stat.getDataLength() < maxBytesBeforeSuppress && lines < 4) || path.endsWith("state.json") || path
- .endsWith("security.json") || (path.endsWith("solrconfig.xml") && Boolean.getBoolean("solr.tests.printsolrconfig")) || path.endsWith("_statupdates") || path.contains("/terms/")) {
+ .endsWith("security.json") || (path.endsWith("solrconfig.xml") && Boolean.getBoolean("solr.tests.printsolrconfig")) || path.endsWith("_statupdates")
+ || path.contains("/terms/") || path.endsWith("leader")) {
// if (path.endsWith(".xml")) {
// // this is the cluster state in xml format - lets pretty print
// dataString = prettyPrint(path, dataString);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 256859d..5d72e29 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -90,7 +90,7 @@ import static java.util.Collections.emptySortedSet;
import static org.apache.solr.common.util.Utils.fromJSON;
public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
- public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
+ public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 1000); // delay between cloud state updates
public static final String STRUCTURE_CHANGE_NOTIFIER = "_scn";
public static final String STATE_UPDATES = "_statupdates";
@@ -245,6 +245,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
private volatile String node = null;
private volatile LiveNodeWatcher liveNodesWatcher;
private volatile CollectionsChildWatcher collectionsChildWatcher;
+ private volatile IsLocalLeader isLocalLeader;
public static interface CollectionRemoved {
void removed(String collection);
@@ -392,7 +393,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
// Need a copy so we don't delete from what we're iterating over.
watchedCollectionStates.forEach((name, coll) -> {
DocCollection newState = null;
- ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+
+ if (!collectionStateLocks.containsKey(name)) {
+ ReentrantLock collectionStateLock = new ReentrantLock(true);
+ ReentrantLock oldLock = collectionStateLocks.putIfAbsent(name, collectionStateLock);
+ }
+
+ ReentrantLock collectionStateLock = collectionStateLocks.get(name);
collectionStateLock.lock();
try {
try {
@@ -433,17 +440,29 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Set<DocCollection> updatedCollections = new HashSet<>();
- DocCollection newState = fetchCollectionState(name);
+ if (!collectionStateLocks.containsKey(name)) {
+ ReentrantLock collectionStateLock = new ReentrantLock(true);
+ ReentrantLock oldLock = collectionStateLocks.putIfAbsent(name, collectionStateLock);
+ }
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+ ReentrantLock collectionStateLock = collectionStateLocks.get(name);
+ collectionStateLock.lock();
try {
- getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
- } catch (Exception e) {
- log.error("Error fetching state updates", e);
- }
- if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
- updatedCollections.add(newState);
+ DocCollection newState = fetchCollectionState(name);
+
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+ try {
+ getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
+ } catch (Exception e) {
+ log.error("Error fetching state updates", e);
+ }
+
+ if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
+ updatedCollections.add(newState);
+ }
+ } finally {
+ collectionStateLock.unlock();
}
} catch (KeeperException e) {
@@ -466,25 +485,20 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.debug("compareStateVersions {} {} {}", coll, version, updateHash);
DocCollection collection = getCollectionOrNull(coll);
if (collection == null) return null;
- if (collection.getZNodeVersion() != version || (collection.getZNodeVersion() == version && collection.hasStateUpdates() && updateHash != collection.getStateUpdates().hashCode())) {
+ if (collection.getZNodeVersion() != version || (collection.getZNodeVersion() == version && updateHash != collection.getStateUpdates().hashCode())) {
if (log.isDebugEnabled()) {
log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
}
DocCollection nu = getCollectionLive(coll);
log.debug("got collection {} {} {}", nu);
if (nu == null) return -3;
-
- constructState(nu, "compareStateVersions");
- }
-
- if (collection.getZNodeVersion() == version && (!collection.hasStateUpdates() || updateHash == collection.getStateUpdates().hashCode())) {
- return null;
}
if (log.isDebugEnabled()) {
- log.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion());
+ log.debug("Wrong version from client [{}]!=[{}] updatesHash={}", version, collection.getZNodeVersion(), collection.getStateUpdates().hashCode());
}
+ // TODO: return state update hash as well
return collection.getZNodeVersion();
}
@@ -680,14 +694,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
if (old == null) {
clusterState.put(coll, docRef);
- ReentrantLock collectionStateLock = new ReentrantLock(true);
- ReentrantLock oldLock = collectionStateLocks.putIfAbsent(coll, collectionStateLock);
log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
}
}
}
+ if (!collectionStateLocks.containsKey(coll)) {
+ ReentrantLock collectionStateLock = new ReentrantLock(true);
+ ReentrantLock oldLock = collectionStateLocks.putIfAbsent(coll, collectionStateLock);
+ }
}
List<String> finalChildren = children;
@@ -979,6 +995,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
this.node = node;
}
+ public void setLeaderChecker(IsLocalLeader isLocalLeader) {
+ this.isLocalLeader = isLocalLeader;
+ }
+
+ public interface IsLocalLeader {
+ boolean isLocalLeader(String name);
+ }
+
+
/**
* Get shard leader properties, with retry if none exist.
*/
@@ -990,10 +1015,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return getLeaderRetry(collection, shard, timeout, false);
}
+ public Replica getLeaderRetry(String collection, String shard, int timeout, boolean checkValidLeader) throws InterruptedException, TimeoutException {
+ return getLeaderRetry(null, collection, shard, timeout, checkValidLeader);
+ }
/**
* Get shard leader properties, with retry if none exist.
*/
- public Replica getLeaderRetry(String collection, String shard, int timeout, boolean checkValidLeader) throws InterruptedException, TimeoutException {
+ public Replica getLeaderRetry(Http2SolrClient httpClient, String collection, String shard, int timeout, boolean checkValidLeader) throws InterruptedException, TimeoutException {
+ log.debug("get leader timeout={}", timeout);
AtomicReference<Replica> returnLeader = new AtomicReference<>();
DocCollection coll;
int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
@@ -1010,7 +1039,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (leader.getState() != Replica.State.ACTIVE) {
return false;
}
-
+ log.debug("Found ACTIVE leader for slice={} leader={}", slice.getName(), leader);
returnLeader.set(leader);
return true;
}
@@ -1019,32 +1048,40 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
});
} catch (TimeoutException e) {
coll = getCollectionOrNull(collection);
+ log.debug("timeout out while waiting to see leader in cluster state {} {}", shard, coll);
throw new TimeoutException(
- "No registered leader was found after waiting for " + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
+ "No registered leader was found after waiting for " + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + coll
+ " with live_nodes=" + liveNodes);
}
Replica leader = returnLeader.get();
- if (checkValidLeader) {
- try (Http2SolrClient client = new Http2SolrClient.Builder("").idleTimeout(readTimeout).markInternalRequest().build()) {
- CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
- prepCmd.setCoreName(leader.getName());
- prepCmd.setLeaderName(leader.getName());
- prepCmd.setCollection(leader.getCollection());
- prepCmd.setShardId(leader.getSlice());
-
- prepCmd.setBasePath(leader.getBaseUrl());
-
- try {
- NamedList<Object> result = client.request(prepCmd);
+ if (checkValidLeader && leader != null) {
+ log.info("checking if found leader is valid {}",leader);
+ if (node != null && isLocalLeader != null && leader.getNodeName().equals(node)) {
+ if (isLocalLeader.isLocalLeader(leader.getName())) {
break;
- } catch (Exception e) {
- log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+ }
+ } else {
+
+ try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).withHttpClient(httpClient).markInternalRequest().build()) {
+ CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+ prepCmd.setCoreName(leader.getName());
+ prepCmd.setLeaderName(leader.getName());
+ prepCmd.setCollection(leader.getCollection());
+ prepCmd.setShardId(leader.getSlice());
+ prepCmd.setBasePath(leader.getBaseUrl());
+ try {
+ NamedList<Object> result = client.request(prepCmd);
+ break;
+ } catch (Exception e) {
+ log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+ }
}
}
if (leaderVerifyTimeout.hasTimedOut()) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
+ log.debug("timeout out while checking if found leader is valid {}", leader);
+ throw new TimeoutException("No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) +
+ " with live_nodes=" + liveNodes);
}
} else {
@@ -1052,8 +1089,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
if (returnLeader.get() == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
+ log.debug("return leader is null");
+ throw new TimeoutException("No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) +
+ " with live_nodes=" + liveNodes);
}
return returnLeader.get();
}
@@ -1109,11 +1147,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public List<Replica> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
Replica.State mustMatchStateFilter, Replica.State mustMatchStateFilter2) {
//TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only
- return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, mustMatchStateFilter2, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
+ Set<Replica.State> matchFilters = new HashSet<>(2);
+ matchFilters.add(mustMatchStateFilter);
+ matchFilters.add(mustMatchStateFilter2);
+ return getReplicaProps(collection, shardId, thisCoreNodeName, matchFilters, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
}
public List<Replica> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
- Replica.State mustMatchStateFilter, Replica.State mustMatchStateFilter2, final EnumSet<Replica.Type> acceptReplicaType) {
+ Collection<Replica.State> matchStateFilters, final EnumSet<Replica.Type> acceptReplicaType) {
assert thisCoreNodeName != null;
ClusterState.CollectionRef docCollectionRef = clusterState.get(collection);
@@ -1139,7 +1180,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
String coreNodeName = entry.getValue().getName();
if (liveNodes.contains(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
- if (mustMatchStateFilter == null || (mustMatchStateFilter == nodeProps.getState() || mustMatchStateFilter2 == nodeProps.getState())) {
+ if (matchStateFilters == null || matchStateFilters.size() == 0 || matchStateFilters.contains(nodeProps.getState())) {
nodes.add(nodeProps);
}
}
@@ -1646,13 +1687,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.error("An error has occurred", e);
return;
}
-
- constructState(null, "collection child watcher");
}
public void refresh() {
try {
refreshCollectionList();
+
+ constructState(null, "collection child watcher");
} catch (AlreadyClosedException e) {
} catch (KeeperException e) {
@@ -1755,7 +1796,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
newState = fetchCollectionState(coll);
String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
newState = getAndProcessStateUpdates(coll, stateUpdatesPath, newState, true);
- // constructState(newState, "getCollectionLive");
} catch (KeeperException e) {
log.warn("Zookeeper error getting latest collection state for collection={}", coll, e);
return null;
@@ -1771,7 +1811,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, DocCollection docCollection, boolean live) throws KeeperException, InterruptedException {
try {
- log.trace("get and process state updates for {}", coll);
+ log.debug("get and process state updates for {}", coll);
Stat stat;
try {
@@ -1785,9 +1825,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
if (docCollection != null && docCollection.hasStateUpdates()) {
- int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
- if (stat.getVersion() < oldVersion) {
- if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
+ Integer oldVersion = (Integer) docCollection.getStateUpdates().get("_ver_");
+ if (oldVersion != null && stat.getVersion() < oldVersion) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates based on updates znode, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
return docCollection;
}
}
@@ -1813,6 +1853,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return docCollection;
}
+ m = new ConcurrentHashMap<>(m);
+
Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
log.trace("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
@@ -1822,14 +1864,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Set<Entry<String,Object>> entrySet = m.entrySet();
if (docCollection != null) {
- if (version < docCollection.getZNodeVersion()) {
- if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+ if (version > 0 && version < docCollection.getZNodeVersion()) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates based on state.json znode, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
return docCollection;
}
if (docCollection.hasStateUpdates()) {
- int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
- if (stat.getVersion() < oldVersion) {
+ Integer oldVersion = (Integer) docCollection.getStateUpdates().get("_ver_");
+ if (oldVersion != null && stat.getVersion() < oldVersion) {
if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
return docCollection;
}
@@ -1845,7 +1887,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
- log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
+ log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica == null ? null : replica.getName(), id, docCollection.getReplicaByIds());
if (replica != null) {
@@ -1893,7 +1935,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
- DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), version, m);
+ DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), (ConcurrentHashMap) m);
docCollection = newDocCollection;
} else {
@@ -1917,7 +1959,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
} catch (Exception e) {
- log.error("exeption trying to process additional updates", e);
+ log.error("Exception trying to process additional updates", e);
}
return docCollection == null ? docCollection : docCollection;
@@ -2168,6 +2210,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
CollectionStateWatcher sw = watchSet.get();
if (sw != null) {
sw.refresh();
+ } else {
constructState(null, "registerDocCollectionWatcher");
}
@@ -2209,11 +2252,12 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
-
- DocCollection coll = getCollectionOrNull(collection);
- if (predicate.matches(getLiveNodes(), coll)) {
- return;
- }
+// NOTE: you cannot shortcut like this - if a zkstatereader has this collection as lazy and we do a waitForState, doing
+// this kind of shortcut will not give you a tmp watched collection that ensures new collection state notification
+// DocCollection coll = getCollectionOrNull(collection);
+// if (predicate.matches(getLiveNodes(), coll)) {
+// return;
+// }
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
@@ -2229,40 +2273,129 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
- public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) throws TimeoutException {
waitForActiveCollection(collection, wait, unit, shards, totalReplicas, false);
}
- public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
- waitForActiveCollection(collection, wait, unit, false, shards, totalReplicas, true);
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) throws TimeoutException {
+ waitForActiveCollection(collection, wait, unit, false, shards, totalReplicas, exact, false);
+ }
+
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, boolean justLeaders, int shards, int totalReplicas, boolean exact, boolean checkValidLeaders)
+ throws TimeoutException {
+ waitForActiveCollection(null, collection, wait, unit, justLeaders, shards, totalReplicas, exact, checkValidLeaders);
}
- public void waitForActiveCollection(String collection, long wait, TimeUnit unit, boolean justLeaders, int shards, int totalReplicas, boolean exact) {
+ public void waitForActiveCollection(Http2SolrClient client, String collection, long wait, TimeUnit unit, boolean justLeaders, int shards, int totalReplicas, boolean exact, boolean checkValidLeaders)
+ throws TimeoutException {
log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
clusterState.size());
assert collection != null;
- CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(justLeaders, shards, totalReplicas, exact);
+ TimeOut leaderVerifyTimeout = new TimeOut(wait, unit, TimeSource.NANO_TIME);
+ while (true) {
+ CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(justLeaders, shards, totalReplicas, exact);
- AtomicReference<DocCollection> state = new AtomicReference<>();
- AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
- try {
- waitForState(collection, wait, unit, (n, c) -> {
- state.set(c);
- liveNodesLastSeen.set(n);
+ AtomicReference<DocCollection> state = new AtomicReference<>();
+ AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
+ try {
+ waitForState(collection, wait, unit, (n, c) -> {
+ state.set(c);
+ liveNodesLastSeen.set(n);
- return predicate.matches(n, c);
- });
- } catch (TimeoutException e) {
- throw new RuntimeException("Failed while waiting for active collection" + "\n" + e.getMessage() + " \nShards:" + shards + " Replicas:" + totalReplicas + "\nLive Nodes: " + Arrays.toString(liveNodesLastSeen.get().toArray())
- + "\nLast available state: " + state.get());
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new RuntimeException("", e);
+ return predicate.matches(n, c);
+ });
+ } catch (TimeoutException e) {
+ throw new TimeoutException("Failed while waiting for active collection" + "\n" + e.getMessage() + " \nShards:" + shards + " Replicas:" + totalReplicas + "\nLive Nodes: " + Arrays
+ .toString(liveNodesLastSeen.get().toArray()) + "\nLast available state: " + state.get());
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new RuntimeException("", e);
+ }
+
+ if (checkValidLeaders) {
+ Boolean success;
+
+ try (Http2SolrClient httpClient = new Http2SolrClient.Builder("").idleTimeout(5000).withHttpClient(client).markInternalRequest().build()) {
+ success = checkLeaders(collection, shards, httpClient);
+ }
+
+ if (success == null || !success) {
+ log.info("Failed confirming all shards have valid leaders");
+ } else {
+ log.info("done checking valid leaders on active collection success={}", success);
+ break;
+ }
+ if (leaderVerifyTimeout.hasTimedOut()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "No registered leader was found " + "collection: " + collection + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
+ }
+
+ } else {
+ break;
+ }
}
}
+ private Boolean checkLeaders(String collection, int shards, Http2SolrClient client) {
+ DocCollection coll = getCollectionOrNull(collection);
+ if (coll == null) {
+ return null;
+ }
+
+ Collection<Slice> slices = coll.getSlices();
+ boolean success = true;
+ int validCnt = 0;
+ for (Slice slice : slices) {
+ Replica leader = slice.getLeader();
+ if (leader != null) {
+
+ if (node != null && isLocalLeader != null && leader.getNodeName().equals(node)) {
+ if (!isLocalLeader.isLocalLeader(leader.getName())) {
+ log.info("failed checking for local leader {} {}", leader.getName());
+ success = false;
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException interruptedException) {
+ ParWork.propagateInterrupt(interruptedException);
+ }
+ break;
+ }
+ } else {
+ CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+ prepCmd.setCoreName(leader.getName());
+ prepCmd.setLeaderName(leader.getName());
+ prepCmd.setCollection(leader.getCollection());
+ prepCmd.setShardId(leader.getSlice());
+
+ prepCmd.setBasePath(leader.getBaseUrl());
+
+ try {
+ NamedList<Object> result = client.request(prepCmd);
+ log.info("Leader looks valid {}", leader);
+ validCnt++;
+ } catch (Exception e) {
+ log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+ success = false;
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException interruptedException) {
+ ParWork.propagateInterrupt(interruptedException);
+ }
+ break;
+ }
+ }
+ } else {
+ success = false;
+ }
+ }
+ if (validCnt != shards) {
+ return false;
+ }
+ return success;
+ }
+
/**
* Block until a LiveNodesStatePredicate returns true, or the wait times out
* <p>
@@ -2279,10 +2412,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
throws InterruptedException, TimeoutException {
- if (predicate.matches(liveNodes)) {
- return;
- }
-
final CountDownLatch latch = new CountDownLatch(1);
LiveNodesListener listener = (n) -> {
@@ -2320,8 +2449,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
- removeDocCollectionWatcher(collection, wrapper);
removeLiveNodesListener(wrapper);
+ removeDocCollectionWatcher(collection, wrapper);
}
/**
@@ -2432,7 +2561,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return newState;
}
- if (docCollRef.isLazilyLoaded()) {
+ if (docCollRef.isLazilyLoaded()) { // should not happen
if (watchedCollectionStates.containsKey(coll)) {
update.set(true);
LazyCollectionRef prev = lazyCollectionStates.remove(coll);
@@ -2950,28 +3079,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (leader.getState() != Replica.State.ACTIVE) {
return false;
}
-// CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
-// prepCmd.setCoreName(leader.getName());
-// prepCmd.setLeaderName(leader.getName());
-// prepCmd.setCollection(collectionState.getName());
-// prepCmd.setShardId(slice.getName());
-//
-// int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
-//
-// try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).markInternalRequest().build()) {
-//
-// prepCmd.setBasePath(leader.getBaseUrl());
-//
-// try {
-// NamedList<Object> result = client.request(prepCmd);
-// } catch (SolrServerException | BaseHttpSolrClient.RemoteSolrException e) {
-// log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
-// return false;
-// } catch (IOException e) {
-// log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
-// return false;
-// }
-// }
}
if (!justLeaders) {
for (Replica replica : slice) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrInternalHttpClient.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrInternalHttpClient.java
index a00d893..bc3abcc 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrInternalHttpClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrInternalHttpClient.java
@@ -21,8 +21,11 @@ public class SolrInternalHttpClient extends HttpClient {
if (log.isDebugEnabled()) {
log.debug("Stopping {}", this.getClass().getSimpleName());
}
- super.doStop();
- assert ObjectReleaseTracker.release(this);
+ try {
+ super.doStop();
+ } finally {
+ assert ObjectReleaseTracker.release(this);
+ }
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 7be6ad6..4d15c24 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -770,49 +770,52 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
// }
public void close() {
-
- removeBean(_tryExecutor);
- _tryExecutor = TryExecutor.NO_TRY;
-
try {
- super.doStop();
- } catch (Exception e) {
- LOG.warn("super.doStop", e);
- return;
- }
+ removeBean(_tryExecutor);
+ _tryExecutor = TryExecutor.NO_TRY;
- setMinThreads(0);
- setIdleTimeout(1);
- setStopTimeout(1);
- // Signal the Runner threads that we are stopping
- int threads = _counts.getAndSetHi(Integer.MIN_VALUE);
+ try {
+ super.doStop();
+ } catch (Exception e) {
+ LOG.warn("super.doStop", e);
- BlockingQueue<Runnable> jobs = getQueue();
+ }
+ setMinThreads(0);
+ setIdleTimeout(1);
+ setStopTimeout(1);
+ // Signal the Runner threads that we are stopping
+ int threads = _counts.getAndSetHi(Integer.MIN_VALUE);
- for (int i = 0; i < threads; ++i) {
- jobs.offer(NOOP);
- }
+ BlockingQueue<Runnable> jobs = getQueue();
+ for (int i = 0; i < threads; ++i) {
+ jobs.offer(NOOP);
+ }
- closed = true;
+ closed = true;
- if (getBusyThreads() > 0) {
+ if (getBusyThreads() > 0) {
- try {
- joinThreads(TimeUnit.MILLISECONDS.toNanos(250));
- } catch (InterruptedException e) {
- LOG.warn("Interrupted in joinThreads on close {}", e);
- } catch (TimeoutException e) {
- LOG.warn("Timeout in joinThreads on close {}", e);
- } catch (ExecutionException e) {
- LOG.warn("Execution exception in joinThreads on close {}", e);
+ try {
+ joinThreads(TimeUnit.MILLISECONDS.toNanos(250));
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted in joinThreads on close {}", e);
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout in joinThreads on close {}", e);
+ } catch (ExecutionException e) {
+ LOG.warn("Execution exception in joinThreads on close {}", e);
+ }
}
- }
- if (_budget != null) _budget.reset();
+ if (_budget != null) _budget.reset();
+ } catch (RuntimeException e) {
+ log.warn("Exception closing", e);
+ throw e;
+ } finally {
+ assert ObjectReleaseTracker.release(this);
+ }
- assert ObjectReleaseTracker.release(this);
}
// @Override
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index eb66667..fcb1236 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -325,6 +325,8 @@ public class SolrTestCase extends Assert {
System.setProperty("urlScheme", "http");
}
+ System.setProperty("lucene.cms.override_spins", "true"); // TODO: detecting spins for every core, every IW#ConcurrentMergeScheduler can be a bit costly, let's detect and cache somehow?
+
System.setProperty("useCompoundFile", "true");
System.setProperty("solr.tests.maxBufferedDocs", "1000");
@@ -429,7 +431,6 @@ public class SolrTestCase extends Assert {
System.setProperty("solr.dependentupdate.timeout", "1500");
// System.setProperty("lucene.cms.override_core_count", "3");
- // System.setProperty("lucene.cms.override_spins", "false");
// unlimited - System.setProperty("solr.maxContainerThreads", "300");
System.setProperty("solr.lowContainerThreadsThreshold", "-1");
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
index 232cb90..a92d394 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
@@ -109,7 +109,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
// // give everyone there own solrhome
// File jettyHome = new File(new File(getSolrHome()).getParentFile(), "jetty" + homeCount.incrementAndGet());
// setupJettySolrHome(jettyHome);
-// JettySolrRunner j = createJetty(jettyHome, null, "shard" + (i + 2));
+// JettySolrRunner j = createJetty(jettyHome, null, "s" + (i + 2));
// j.start();
// jettys.add(j);
// clients.add(createNewSolrClient(j.getLocalPort()));
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 03b3a5f..3e1c4bb 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -152,8 +152,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
}
- public static final String SHARD1 = "shard1";
- public static final String SHARD2 = "shard2";
+ public static final String SHARD1 = "s1";
+ public static final String SHARD2 = "s2";
protected boolean printLayoutOnTearDown = false;
@@ -476,7 +476,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
j.start();
jettys.add(j);
createReplicaRequests.add(CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
+ .addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.TLOG));
waitForLiveNode(j);
@@ -506,7 +506,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
j.start();
jettys.add(j);
createReplicaRequests.add(CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
+ .addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.NRT));
waitForLiveNode(j);
@@ -532,7 +532,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
j.start();
jettys.add(j);
createPullReplicaRequests.add(CollectionAdminRequest
- .addReplicaToShard(DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
+ .addReplicaToShard(DEFAULT_COLLECTION, "s" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.PULL));
waitForLiveNode(j);
@@ -596,13 +596,17 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// MiniSolrCloudCluster.expectedShardsAndActiveReplicas(sliceCount, addReplicas.get()));
waitForActiveReplicaCount(cloudClient, DEFAULT_COLLECTION, addReplicas.get());
+
this.jettys.addAll(jettys);
this.clients.addAll(clients);
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+ zkStateReader.waitForActiveCollection(DEFAULT_COLLECTION, 10, TimeUnit.SECONDS, false, sliceCount, addReplicas.get(), true, true);
+
// make sure we have a leader for each shard
for (int i = 1; i <= sliceCount; i++) {
- zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + i, 10000);
+ zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "s" + i, 10000);
}
if (sliceCount > 0) {
@@ -760,6 +764,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.withServlets(getExtraServlets())
.withFilters(getExtraRequestFilters())
.withSSLConfig(sslConfig.buildServerSSLConfig())
+ .enableProxy(true)
.build();
Properties props = new Properties();
@@ -778,7 +783,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
props.setProperty("coreRootDirectory", solrHome.toPath().resolve("cores").toAbsolutePath().toString());
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig, true);
+ JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), props, jettyconfig);
return jetty;
}
@@ -961,7 +966,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
StringBuilder sb = new StringBuilder();
for (int i = 0; i < sliceCount; i++) {
if (i > 0) sb.append(',');
- sb.append("shard").append(i + 1);
+ sb.append("s").append(i + 1);
}
params.set("shards", sb.toString());
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 53b09bb..815d4e5 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
@@ -135,7 +136,7 @@ public class MiniSolrCloudCluster {
private final boolean externalZkServer;
private final List<JettySolrRunner> jettys = new CopyOnWriteArrayList<>();
private final Path baseDir;
- private CloudHttp2SolrClient solrClient;
+ private volatile CloudHttp2SolrClient solrClient;
private final JettyConfig jettyConfig;
private final boolean trackJettyMetrics;
@@ -612,17 +613,24 @@ public class MiniSolrCloudCluster {
final Set<String> collections = reader.getClusterState().getCollectionsMap().keySet();
try (ParWork work = new ParWork(this, false, false)) {
collections.forEach(collection -> {
- work.collect("", ()->{
- try {
- CollectionAdminRequest.deleteCollection(collection).process(solrClient);
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- });
+ work.collect("", () -> {
+ try {
+ CollectionAdminRequest.deleteCollection(collection).process(solrClient);
+ } catch (SolrException e) {
+ if (e.code() == 400) {
+ log.warn("400 on collection delete (likely already gone)", e);
+
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ });
});
}
}
-
+
public void deleteAllConfigSets() throws SolrServerException, IOException {
List<String> configSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
@@ -663,6 +671,8 @@ public class MiniSolrCloudCluster {
} finally {
System.clearProperty("zkHost");
solrClient = null;
+ solrZkClient = null;
+ zkStateReader = null;
assert ObjectReleaseTracker.release(this);
}
@@ -848,21 +858,24 @@ public class MiniSolrCloudCluster {
throw new SolrException(ErrorCode.NOT_FOUND, "No open Overseer found");
}
- public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) throws TimeoutException {
waitForActiveCollection(collection, wait, unit, shards, totalReplicas, false);
}
- public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) throws TimeoutException {
zkStateReader.waitForActiveCollection(collection, wait, unit, shards, totalReplicas, exact);
}
- public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, boolean justLeaders, int shards, int totalReplicas, boolean exact, boolean verifyLeaders) throws TimeoutException {
+ zkStateReader.waitForActiveCollection(collection, wait, unit, justLeaders, shards, totalReplicas, exact, verifyLeaders);
+ }
+
+ public void waitForActiveCollection(String collection, int shards, int totalReplicas) throws TimeoutException {
if (collection == null) throw new IllegalArgumentException("null collection");
waitForActiveCollection(collection, 60, TimeUnit.SECONDS, shards, totalReplicas);
}
- public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
-
+ public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) throws TimeoutException {
waitForActiveCollection(collection, 60, TimeUnit.SECONDS, shards, totalReplicas, exact);
}
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
index 6f81795..55547c9 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
@@ -72,6 +72,7 @@
<AsyncLogger name="org.apache.solr.cloud.StatePublisher" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
+ <AsyncLogger name="org.apache.solr.core.CachingDirectoryFactory" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.core.CoreContainer" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.common.cloud.ZkMaintenanceUtils" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="DEBUG"/>