You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/18 20:16:36 UTC
[lucene-solr] branch reference_impl updated: @1468 Stress test
addressing and cleanup, bring back publish recovery and down node.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new b8f7753 @1468 Stress test addressing and cleanup, bring back publish recovery and down node.
b8f7753 is described below
commit b8f77535bf66792d0be1624414bc23959f6b7216
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 18 15:15:52 2021 -0500
@1468 Stress test addressing and cleanup, bring back publish recovery and down node.
---
.../client/solrj/embedded/JettySolrRunner.java | 5 +-
.../java/org/apache/solr/cloud/LeaderElector.java | 6 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 102 ++-
.../org/apache/solr/cloud/OverseerTaskQueue.java | 3 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 120 ++-
.../java/org/apache/solr/cloud/StatePublisher.java | 103 ++-
.../java/org/apache/solr/cloud/ZkController.java | 186 +++--
.../cloud/api/collections/CreateCollectionCmd.java | 13 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 2 +
.../OverseerCollectionMessageHandler.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 294 ++++---
.../java/org/apache/solr/core/CoreContainer.java | 255 +++---
.../src/java/org/apache/solr/core/SolrCore.java | 7 +-
.../src/java/org/apache/solr/core/SolrCores.java | 7 +-
.../java/org/apache/solr/handler/IndexFetcher.java | 2 +-
.../org/apache/solr/handler/admin/ColStatus.java | 5 +-
.../solr/handler/admin/CollectionsHandler.java | 3 +
.../apache/solr/handler/admin/PrepRecoveryOp.java | 41 +-
.../handler/component/RealTimeGetComponent.java | 10 +-
.../apache/solr/rest/ManagedResourceStorage.java | 4 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 3 +-
.../java/org/apache/solr/servlet/HttpSolrCall.java | 47 +-
.../apache/solr/servlet/SolrDispatchFilter.java | 29 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 12 +-
.../org/apache/solr/update/SolrIndexWriter.java | 3 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 2 +-
.../processor/DistributedUpdateProcessor.java | 2 +-
.../processor/DistributedZkUpdateProcessor.java | 27 +-
.../solr/util/plugin/AbstractPluginLoader.java | 57 +-
.../apache/solr/cloud/CollectionsAPISolrJTest.java | 15 +-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 19 +-
.../org/apache/solr/cloud/DeleteShardTest.java | 2 +
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 2 +-
.../test/org/apache/solr/cloud/RecoveryZkTest.java | 28 +-
.../apache/solr/cloud/SolrCloudBridgeTestCase.java | 2 +-
.../test/org/apache/solr/cloud/SyncSliceTest.java | 5 +-
.../apache/solr/cloud/TestCloudDeleteByQuery.java | 2 +
.../org/apache/solr/cloud/TestCloudRecovery.java | 16 +-
.../org/apache/solr/cloud/TestCloudRecovery2.java | 3 +-
.../org/apache/solr/cloud/TestPullReplica.java | 7 +-
.../org/apache/solr/cloud/TestSegmentSorting.java | 2 +-
.../cloud/TestWaitForStateWithJettyShutdowns.java | 2 +-
.../api/collections/CollectionReloadTest.java | 1 +
.../CreateCollectionsIndexAndRestartTest.java | 14 +-
.../solr/core/CachingDirectoryFactoryTest.java | 2 +-
.../test/org/apache/solr/core/TestConfigSets.java | 4 +-
.../solr/handler/component/SearchHandlerTest.java | 2 +
.../org/apache/solr/rest/SolrRestletTestBase.java | 2 +
.../rest/schema/TestUniqueKeyFieldResource.java | 3 +-
solr/server/etc/jetty-http.xml | 2 +-
solr/server/etc/jetty-https.xml | 2 +-
.../apache/solr/client/solrj/cloud/ShardTerms.java | 7 +-
.../client/solrj/impl/BaseCloudSolrClient.java | 175 ++--
.../solr/client/solrj/impl/Http2SolrClient.java | 25 +-
.../apache/solr/common/cloud/DocCollection.java | 17 +-
.../java/org/apache/solr/common/cloud/Replica.java | 1 -
.../java/org/apache/solr/common/cloud/Slice.java | 5 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 5 +
.../apache/solr/common/cloud/ZkStateReader.java | 913 ++++++++++++---------
.../solr/common/util/ObjectReleaseTracker.java | 2 +-
.../solr/common/util/SolrQueuedThreadPool.java | 2 +-
.../src/java/org/apache/solr/SolrTestCase.java | 2 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 5 +-
63 files changed, 1532 insertions(+), 1116 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 0784133..4eede2c 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -20,6 +20,7 @@ import org.apache.lucene.util.Constants;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SocketProxy;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -720,7 +721,7 @@ public class JettySolrRunner implements Closeable {
ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
try {
- if (!reader.isClosed() && reader.getZkClient().isConnected()) {
+ if (reader != null && !reader.isClosed() && reader.getZkClient().isConnected()) {
reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
}
} catch (InterruptedException e) {
@@ -966,7 +967,7 @@ public class JettySolrRunner implements Closeable {
public void close() throws IOException {
try {
zkClient.removeWatches(ZkStateReader.COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 71873ce..942c6d7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -282,7 +282,7 @@ public class LeaderElector implements Closeable {
if (success) {
state = LEADER;
} else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed becoming leader");
+ log.warn("Failed becoming leader {}", context.leaderProps);
}
} finally {
if (!success) {
@@ -451,7 +451,7 @@ public class LeaderElector implements Closeable {
}
private boolean shouldRejectJoins() {
- return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || isClosed;
+ return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || zkClient.isClosed();
}
@Override
@@ -580,7 +580,7 @@ public class LeaderElector implements Closeable {
this.closed = true;
try {
zkClient.removeWatches(watchedNode, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 5ae5e4b..384d486 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -69,11 +69,11 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -789,7 +789,7 @@ public class Overseer implements SolrCloseable {
private void closeWatcher() {
try {
zkController.getZkClient().removeWatches(path, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
@@ -808,7 +808,7 @@ public class Overseer implements SolrCloseable {
zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
startItems = super.getItems();
- log.info("Overseer found entries on start {}", startItems);
+ log.info("Overseer found entries on start {} {}", startItems, path);
if (startItems.size() > 0) {
processQueueItems(startItems, true);
}
@@ -816,11 +816,12 @@ public class Overseer implements SolrCloseable {
@Override
protected void processQueueItems(List<String> items, boolean onStart) {
- if (closed) return;
+ //if (closed) return;
List<String> fullPaths = new ArrayList<>(items.size());
CountDownLatch delCountDownLatch = null;
ourLock.lock();
String forceWrite = null;
+ boolean wroteUpdates = false;
try {
if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
for (String item : items) {
@@ -829,16 +830,6 @@ public class Overseer implements SolrCloseable {
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
- if (fullPaths.size() > 0) {
- if (!zkController.getZkClient().isClosed()) {
- try {
- delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
- } catch (Exception e) {
- log.warn("Failed deleting processed items", e);
- }
- }
- }
-
final List<StateEntry> shardStateCollections = new ArrayList<>();
for (byte[] item : data.values()) {
@@ -846,13 +837,14 @@ public class Overseer implements SolrCloseable {
try {
StateEntry entry = new StateEntry();
entry.message = message;
+ log.debug("add state update {}", message);
shardStateCollections.add(entry);
} catch (Exception e) {
log.error("Overseer state update queue processing failed", e);
}
}
- Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates = new HashMap<>();
+ Map<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> collStateUpdates = new ConcurrentHashMap<>();
for (Overseer.StateEntry sentry : shardStateCollections) {
try {
@@ -875,60 +867,69 @@ public class Overseer implements SolrCloseable {
}
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
- updates = new ArrayList<>();
+ updates = new ConcurrentHashMap<>( );
collStateUpdates.put(collId, updates);
}
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
- if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
+ if (log.isDebugEnabled()) log.debug("set down node operation {} for replica {}", op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
update.state = Replica.State.getShortState(Replica.State.DOWN);
- updates.add(update);
+ updates.put(update.id, update);
}
}
});
} else if (OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry.getKey()))) {
Overseer.this.zkStateWriter.getCS().forEach((coll, docColl) -> {
String collId = Long.toString(docColl.getId());
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
- updates = new ArrayList<>();
+ updates = new ConcurrentHashMap<>();
collStateUpdates.put(collId, updates);
}
List<Replica> replicas = docColl.getReplicas();
for (Replica replica : replicas) {
if (replica.getNodeName().equals(stateUpdateEntry.getValue())) {
- if (log.isDebugEnabled()) log.debug("set node operation {} for replica {}", op, replica);
+ if (log.isDebugEnabled()) log.debug("set recovery node operation {} for replica {}", op, replica);
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.id = replica.getId();
update.state = Replica.State.getShortState(Replica.State.RECOVERING);
- updates.add(update);
+ updates.put(update.id, update);
}
}
});
- } else {
- // if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
- String id = stateUpdateEntry.getKey();
+ }
- String stateString = (String) stateUpdateEntry.getValue();
- if (log.isDebugEnabled()) {
- log.debug("stateString={}", stateString);
- }
- String collId = id.substring(0, id.indexOf('-'));
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
- if (updates == null) {
- updates = new ArrayList<>();
- collStateUpdates.put(collId, updates);
+ for (Map.Entry<String,Object> stateUpdateEntry2 : stateUpdateMessage.getProperties().entrySet()) {
+ // if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(stateUpdateEntry.getKey()));
+ if (OverseerAction.DOWNNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey())) || OverseerAction.RECOVERYNODE.equals(OverseerAction.get(stateUpdateEntry2.getKey()))) {
+ continue;
}
+ String id = stateUpdateEntry2.getKey();
+
+ String stateString = (String) stateUpdateEntry2.getValue();
+
+ log.trace("stateString={}", stateString);
+
+ try {
+ String collId = id.substring(0, id.indexOf('-'));
+ ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ if (updates == null) {
+ updates = new ConcurrentHashMap<>();
+ collStateUpdates.put(collId, updates);
+ }
- ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
- update.id = id;
- update.state = stateString;
- updates.add(update);
+ ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
+ update.id = id;
+ update.state = stateString;
+ updates.put(id, update);
+ } catch (Exception e) {
+ log.error("error processing state update {} {}", id, stateString);
+ }
}
}
@@ -945,9 +946,9 @@ public class Overseer implements SolrCloseable {
Long collIdLong = zkStateWriter.getCS().get(collection).getId();
if (collIdLong != null) {
String collId = Long.toString(collIdLong);
- List<ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
+ ConcurrentHashMap<String,ZkStateWriter.StateUpdate> updates = collStateUpdates.get(collId);
if (updates == null) {
- updates = new ArrayList<>();
+ updates = new ConcurrentHashMap<>();
collStateUpdates.put(collId, updates);
}
@@ -958,7 +959,7 @@ public class Overseer implements SolrCloseable {
ZkStateWriter.StateUpdate update = new ZkStateWriter.StateUpdate();
update.sliceState = (String) stateUpdateEntry.getValue();
update.sliceName = stateUpdateEntry.getKey();
- updates.add(update);
+ updates.put(update.sliceName, update);
}
}
}
@@ -989,11 +990,24 @@ public class Overseer implements SolrCloseable {
if (collections.size() == 0 && forceWrite != null) {
overseer.writePendingUpdates(forceWrite);
}
-
- } finally {
+ wroteUpdates = true;
+ } catch (Exception e) {
+ log.error("Exception handling Overseer state updates",e);
+ } finally {
try {
+ if (fullPaths.size() > 0 && wroteUpdates) {
+ if (!zkController.getZkClient().isClosed()) {
+ try {
+ delCountDownLatch = zkController.getZkClient().delete(fullPaths, false);
+ } catch (Exception e) {
+ log.warn("Failed deleting processed items", e);
+ }
+ }
+ }
+
if (delCountDownLatch != null) {
try {
+
boolean success = delCountDownLatch.await(10, TimeUnit.SECONDS);
if (log.isDebugEnabled()) log.debug("done waiting on latch, success={}", success);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 618430c..774cd74 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import com.codahale.metrics.Timer;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -214,7 +215,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
public void close() {
try {
zkClient.removeWatches(path, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 88e138e..013e285 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -77,6 +77,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class RecoveryStrategy implements Runnable, Closeable {
+ private final String collection;
+ private final String shard;
private volatile CountDownLatch latch;
private volatile ReplicationHandler replicationHandler;
private volatile Http2SolrClient recoveryOnlyClient;
@@ -107,7 +109,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
private volatile int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer
.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 0);
- private volatile int maxRetries = 500;
+ private volatile int maxRetries = Integer.getInteger("solr.recovery.maxretries", 500);
private volatile int startingRecoveryDelayMilliSeconds = Integer
.getInteger("solr.cloud.starting-recovery-delay-milli-seconds", 0);
@@ -134,6 +136,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
// ObjectReleaseTracker.track(this);
this.cc = cc;
this.coreName = cd.getName();
+ this.collection = cd.getCloudDescriptor().getCollectionName();
+ this.shard = cd.getCloudDescriptor().getShardId();
this.recoveryListener = recoveryListener;
zkController = cc.getZkController();
@@ -353,13 +357,21 @@ public class RecoveryStrategy implements Runnable, Closeable {
// expected
}
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 3000, false);
+ LeaderElector leaderElector = zkController.getLeaderElector(coreName);
- if (leader != null && leader.getName().equals(coreName)) {
+ if (leaderElector != null && leaderElector.isLeader()) {
log.info("We are the leader, STOP recovery");
close = true;
return;
}
+
+ Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
+
+ if (leader != null && leader.getName().equals(coreName)) {
+ log.info("We are the leader in cluster state, REPEAT recovery");
+ Thread.sleep(50);
+ continue;
+ }
if (core.isClosing() || core.getCoreContainer().isShutDown()) {
log.info("We are closing, STOP recovery");
close = true;
@@ -408,8 +420,21 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
- leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, true);
+ LeaderElector leaderElector = zkController.getLeaderElector(coreName);
+
+ if (leaderElector != null && leaderElector.isLeader()) {
+ log.info("We are the leader, STOP recovery");
+ close = true;
+ return false;
+ }
+
+ leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
+ if (leader != null && leader.getName().equals(coreName)) {
+ log.info("We are the leader in cluster state, REPEAT recovery");
+ Thread.sleep(50);
+ continue;
+ }
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
@@ -603,15 +628,22 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ LeaderElector leaderElector = zkController.getLeaderElector(coreName);
- leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 3000, true);
-
- if (leader != null && leader.getName().equals(coreName)) {
+ if (leaderElector != null && leaderElector.isLeader()) {
log.info("We are the leader, STOP recovery");
close = true;
return false;
}
+ leader = zkController.getZkStateReader().getLeaderRetry(core.getCoreDescriptor().getCollectionName(), core.getCoreDescriptor().getCloudDescriptor().getShardId(), Integer.getInteger("solr.getleader.looptimeout", 8000));
+
+ if (leader != null && leader.getName().equals(coreName)) {
+ log.info("We are the leader in cluster state, REPEAT recovery");
+ Thread.sleep(50);
+ continue;
+ }
+
log.debug("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
@@ -676,13 +708,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
didReplication = true;
try {
- // try {
- // if (prevSendPreRecoveryHttpUriRequest != null) {
- // prevSendPreRecoveryHttpUriRequest.cancel();
- // }
- // } catch (NullPointerException e) {
- // // okay
- // }
+ try {
+ if (prevSendPreRecoveryHttpUriRequest != null) {
+ prevSendPreRecoveryHttpUriRequest.cancel();
+ }
+ } catch (NullPointerException e) {
+ // okay
+ }
+ log.debug("Begin buffering updates. core=[{}]", coreName);
+ // recalling buffer updates will drop the old buffer tlog
+ ulog.bufferUpdates();
sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().
getCollection(core.getCoreDescriptor().getCollectionName()).getSlice(cloudDesc.getShardId()), core.getCoreDescriptor());
@@ -906,8 +941,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
return close || cc.isShutDown();
}
- final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice, CoreDescriptor coreDescriptor)
- throws SolrServerException, IOException {
+ final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice, CoreDescriptor coreDescriptor) {
if (coreDescriptor.getCollectionName() == null) {
throw new IllegalStateException("Collection name cannot be null");
@@ -923,7 +957,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Sending prep recovery command to {} for leader={} params={}", leaderBaseUrl, leaderCoreName, prepCmd.getParams());
int conflictWaitMs = zkController.getLeaderConflictResolveWait();
- int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "1000"));
+ int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
try (Http2SolrClient client = new Http2SolrClient.Builder(leaderBaseUrl).withHttpClient(cc.getUpdateShardHandler().
getRecoveryOnlyClient()).idleTimeout(readTimeout).markInternalRequest().build()) {
@@ -931,20 +965,19 @@ public class RecoveryStrategy implements Runnable, Closeable {
prepCmd.setBasePath(leaderBaseUrl);
latch = new CountDownLatch(1);
- Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener(latch));
+ Cancellable result = client.asyncRequest(prepCmd, null, new NamedListAsyncListener(latch, leaderCoreName));
try {
prevSendPreRecoveryHttpUriRequest = result;
try {
- boolean success = latch.await(15, TimeUnit.SECONDS);
+ boolean success = latch.await(readTimeout, TimeUnit.MILLISECONDS);
if (!success) {
- result.cancel();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Timeout waiting for prep recovery cmd on leader");
+ //result.cancel();
+ log.warn("Timeout waiting for prep recovery cmd on leader {}", leaderCoreName);
}
} catch (InterruptedException e) {
close = true;
ParWork.propagateInterrupt(e);
} finally {
- prevSendPreRecoveryHttpUriRequest = null;
latch = null;
}
} finally {
@@ -953,12 +986,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- private static class NamedListAsyncListener implements AsyncListener<NamedList<Object>> {
+ private class NamedListAsyncListener implements AsyncListener<NamedList<Object>> {
private final CountDownLatch latch;
+ private final String leaderCoreName;
- public NamedListAsyncListener(CountDownLatch latch) {
+ public NamedListAsyncListener(CountDownLatch latch, String leaderCoreName) {
this.latch = latch;
+ this.leaderCoreName = leaderCoreName;
}
@Override
@@ -968,15 +1003,46 @@ public class RecoveryStrategy implements Runnable, Closeable {
} catch (NullPointerException e) {
}
+ prevSendPreRecoveryHttpUriRequest = null;
}
@Override
public void onFailure(Throwable throwable, int code) {
- try {
- latch.countDown();
- } catch (NullPointerException e) {
+ log.info("failed sending prep recovery cmd to leader");
+
+ if (throwable.getMessage().contains("Not the valid leader")) {
+ try {
+ try {
+ Thread.sleep(250);
+ cc.getZkController().getZkStateReader().waitForState(RecoveryStrategy.this.collection, 3, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ Slice slice = collectionState.getSlice(shard);
+ if (slice == null) {
+ return false;
+ }
+ if (slice.getLeader() == null) {
+ return false;
+ }
+ if (slice.getLeader().getName() == leaderCoreName) {
+ return false;
+ }
+ return true;
+ });
+ } catch (Exception e) {
+ }
+ } finally {
+ try {
+ latch.countDown();
+ } catch (NullPointerException e) {
+
+ }
+ prevSendPreRecoveryHttpUriRequest = null;
+ }
}
+
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 8491334..8fe7abb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -17,7 +17,6 @@
package org.apache.solr.cloud;
import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
@@ -27,8 +26,8 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
public class StatePublisher implements Closeable {
private static final Logger log = LoggerFactory
@@ -62,10 +60,10 @@ public class StatePublisher implements Closeable {
public static class NoOpMessage extends ZkNodeProps {
}
-
+ static final String PREFIX = "qn-";
public static final NoOpMessage TERMINATE_OP = new NoOpMessage();
- private final BlockingArrayQueue<ZkNodeProps> workQueue = new BlockingArrayQueue(64, 16);
+ private final ArrayBlockingQueue<ZkNodeProps> workQueue = new ArrayBlockingQueue<>(1024, true);
private final ZkDistributedQueue overseerJobQueue;
private volatile Worker worker;
private volatile Future<?> workerFuture;
@@ -80,50 +78,60 @@ public class StatePublisher implements Closeable {
@Override
public void run() {
- while (!terminated && !zkStateReader.getZkClient().isClosed()) {
- if (!zkStateReader.getZkClient().isConnected()) {
- try {
- zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
- } catch (TimeoutException e) {
- continue;
- } catch (InterruptedException e) {
- log.error("publisher interrupted", e);
- }
- continue;
- }
+ while (!terminated) {
+// if (!zkStateReader.getZkClient().isConnected()) {
+// try {
+// zkStateReader.getZkClient().getConnectionManager().waitForConnected(5000);
+// } catch (TimeoutException e) {
+// continue;
+// } catch (InterruptedException e) {
+// log.error("publisher interrupted", e);
+// }
+// continue;
+// }
ZkNodeProps message = null;
ZkNodeProps bulkMessage = new ZkNodeProps();
bulkMessage.getProperties().put(OPERATION, "state");
+ int pollTime = 250;
try {
try {
- message = workQueue.poll(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
-
+ log.debug("State publisher will poll for 5 seconds");
+ message = workQueue.poll(5000, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ log.warn("state publisher hit exception polling", e);
}
if (message != null) {
log.debug("Got state message " + message);
+
if (message == TERMINATE_OP) {
log.debug("State publish is terminated");
terminated = true;
- return;
} else {
- bulkMessage(message, bulkMessage);
+ if (bulkMessage(message, bulkMessage)) {
+ pollTime = 20;
+ } else {
+ pollTime = 150;
+ }
}
- while (!terminated) {
+ while (true) {
try {
- message = workQueue.poll(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.warn("state publisher interrupted", e);
- return;
+ log.debug("State publisher will poll for {} ms", pollTime);
+ message = workQueue.poll(pollTime, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ log.warn("state publisher hit exception polling", e);
}
if (message != null) {
if (log.isDebugEnabled()) log.debug("Got state message " + message);
if (message == TERMINATE_OP) {
terminated = true;
} else {
- bulkMessage(message, bulkMessage);
+ if (bulkMessage(message, bulkMessage)) {
+ pollTime = 10;
+ } else {
+ pollTime = 25;
+ }
}
} else {
break;
@@ -133,20 +141,21 @@ public class StatePublisher implements Closeable {
if (bulkMessage.getProperties().size() > 1) {
processMessage(bulkMessage);
+ } else {
+ log.debug("No messages to publish, loop");
}
- } catch (AlreadyClosedException e) {
- log.info("StatePublisher run loop hit AlreadyClosedException, exiting ...");
- return;
+ if (terminated) {
+ log.info("State publisher has terminated");
+ break;
+ }
} catch (Exception e) {
- log.error("Exception in StatePublisher run loop, exiting", e);
- return;
+ log.error("Exception in StatePublisher run loop", e);
}
}
}
- private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
- if (log.isDebugEnabled()) log.debug("Bulk state zkNodeProps={} bulkMessage={}", zkNodeProps, bulkMessage);
+ private boolean bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) {
if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
//clearStatesForNode(bulkMessage, nodeName);
@@ -168,7 +177,11 @@ public class StatePublisher implements Closeable {
String line = Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
if (log.isDebugEnabled()) log.debug("bulk publish core={} id={} state={} line={}", core, id, state, line);
bulkMessage.getProperties().put(id, line);
+ if (state.equals(Replica.State.RECOVERING.toString())) {
+ return true;
+ }
}
+ return false;
}
private void clearStatesForNode(ZkNodeProps bulkMessage, String nodeName) {
@@ -195,9 +208,17 @@ public class StatePublisher implements Closeable {
}
private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
- log.debug("Send state updates to Overseer {}", message);
+ log.info("Send state updates to Overseer {}", message);
byte[] updates = Utils.toJSON(message);
- overseerJobQueue.offer(updates);
+
+ zkStateReader.getZkClient().create("/overseer/queue" + "/" + PREFIX, updates, CreateMode.PERSISTENT_SEQUENTIAL, (rc, path, ctx, name, stat) -> {
+ if (rc != 0) {
+ log.error("got zk error deleting path {} {}", path, rc);
+ KeeperException e = KeeperException.create(KeeperException.Code.get(rc), path);
+ log.error("Exception publish state messages path=" + path, e);
+ workQueue.offer(message);
+ }
+ });
}
}
@@ -240,10 +261,11 @@ public class StatePublisher implements Closeable {
CacheEntry lastState = stateCache.get(id);
//&& (System.currentTimeMillis() - lastState.time < 1000) &&
- if (!state.equals("leader") && collection != null && lastState != null && replica != null && !state.equals(lastState.state) && replica.getState().toString().equals(state)) {
- log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
- return;
- }
+ // TODO: needs work
+// if (state.equals(lastState.state)) {
+// log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+// return;
+// }
}
if (id == null) {
@@ -294,6 +316,7 @@ public class StatePublisher implements Closeable {
}
} else {
+ log.error("illegal state message {}", stateMessage.toString());
throw new IllegalArgumentException(stateMessage.toString());
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 680e57c..c49425c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -21,7 +21,9 @@ import org.apache.solr.client.solrj.cloud.DistributedLock;
import org.apache.solr.client.solrj.cloud.LockListener;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -50,6 +52,7 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
@@ -59,7 +62,6 @@ import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrCoreInitializationException;
import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.packagemanager.PackageUtils;
@@ -324,6 +326,8 @@ public class ZkController implements Closeable, Runnable {
if (zkController.cc.getAllCoreNames().contains(descriptor.getName())) {
try {
zkController.register(descriptor.getName(), descriptor, afterExpiration);
+ } catch (AlreadyClosedException e) {
+ log.warn("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
} catch (Exception e) {
log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration, e);
}
@@ -1080,7 +1084,6 @@ public class ZkController implements Closeable, Runnable {
try {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
-
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
this.asyncIdsMap = Overseer.getAsyncIdsMap(zkClient);
@@ -1097,18 +1100,16 @@ public class ZkController implements Closeable, Runnable {
this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(), getNodeName(), zkStateReader);
overseerElector = new LeaderElector(this);
//try (ParWork worker = new ParWork(this, false, true)) {
- // start the overseer first as following code may need it's processing
- // worker.collect("startOverseer", () -> {
- ElectionContext context = getOverseerContext();
- if (log.isDebugEnabled()) log.debug("Overseer setting up context {}", context.leaderProps.getNodeName());
- overseerElector.setup(context);
+ // start the overseer first as following code may need it's processing
+ // worker.collect("startOverseer", () -> {
+ ElectionContext context = getOverseerContext();
+ if (log.isDebugEnabled()) log.debug("Overseer setting up context {}", context.leaderProps.getNodeName());
+ overseerElector.setup(context);
- log.info("Overseer joining election {}", context.leaderProps.getNodeName());
- overseerElector.joinElection(false);
+ log.info("Overseer joining election {}", context.leaderProps.getNodeName());
+ overseerElector.joinElection(false);
- zkStateReader.forciblyRefreshAllClusterStateSlow();
publishNodeAs(getNodeName(), OverseerAction.RECOVERYNODE);
-
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -1205,16 +1206,18 @@ public class ZkController implements Closeable, Runnable {
}
}
- public void removeEphemeralLiveNode() throws KeeperException {
- log.info("Removing our ephemeral live node");
- String nodeName = getNodeName();
- String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
- try {
- zkClient.delete(nodePath, -1, true, false);
- } catch (NoNodeException | SessionExpiredException e) {
- // okay
- } catch (Exception e) {
- log.warn("Could not remove ephemeral live node {}", nodePath, e);
+ public void removeEphemeralLiveNode() {
+ if (zkClient.isAlive()) {
+ log.info("Removing our ephemeral live node");
+ String nodeName = getNodeName();
+ String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+ try {
+ zkClient.delete(nodePath, -1, true, false);
+ } catch (NoNodeException | SessionExpiredException e) {
+ // okay
+ } catch (Exception e) {
+ log.warn("Could not remove ephemeral live node {}", nodePath, e);
+ }
}
}
@@ -1255,6 +1258,7 @@ public class ZkController implements Closeable, Runnable {
}
MDCLoggingContext.setCoreName(desc.getName());
ZkShardTerms shardTerms = null;
+ // LeaderElector leaderElector = null;
LeaderElector leaderElector = null;
try {
final String baseUrl = getBaseUrl();
@@ -1262,17 +1266,6 @@ public class ZkController implements Closeable, Runnable {
final String collection = cloudDesc.getCollectionName();
final String shardId = cloudDesc.getShardId();
- log.debug("Register SolrCore, core={} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
-
- DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
- if (docCollection != null) {
- Replica replica = docCollection.getReplica(coreName);
- if (replica != null && !baseUrl.equals(replica.getBaseUrl())) {
- log.error("IllegalStateException wrong base url for this node in replica entry replica={}", replica);
- //throw new IllegalArgumentException("wrong base url for this node in replica entry baseUrl=" + baseUrl + " replica=" + replica);
- }
- }
-
log.debug("Register replica - core={} id={} address={} collection={} shard={} type={}", coreName, desc.getCoreProperties().get("id"), baseUrl, collection, shardId, cloudDesc.getReplicaType());
log.debug("Register terms for replica {}", coreName);
@@ -1280,22 +1273,22 @@ public class ZkController implements Closeable, Runnable {
registerShardTerms(collection, cloudDesc.getShardId(), coreName);
log.info("Create leader elector for replica {}", coreName);
- leaderElector = leaderElectors.get(coreName);
- if (leaderElector == null) {
- leaderElector = new LeaderElector(this);
- LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
-
- if (oldElector != null) {
- IOUtils.closeQuietly(leaderElector);
- }
-
- if (cc.isShutDown()) {
- IOUtils.closeQuietly(leaderElector);
- IOUtils.closeQuietly(oldElector);
- IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
- throw new AlreadyClosedException();
- }
- }
+// leaderElector = leaderElectors.get(coreName);
+// if (leaderElector == null) {
+// leaderElector = new LeaderElector(this);
+// LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
+//
+// if (oldElector != null) {
+// IOUtils.closeQuietly(leaderElector);
+// }
+//
+// if (cc.isShutDown()) {
+// IOUtils.closeQuietly(leaderElector);
+// IOUtils.closeQuietly(oldElector);
+// IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
+// throw new AlreadyClosedException();
+// }
+// }
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false; //replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
@@ -1309,19 +1302,56 @@ public class ZkController implements Closeable, Runnable {
log.info("Wait to see leader for {}, {}", collection, shardId);
String leaderName = null;
- for (int i = 0; i < 60; i++) {
+ for (int i = 0; i < 20; i++) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
}
+ leaderElector = leaderElectors.get(coreName);
- if (leaderElector.isLeader()) {
+ if (leaderElector != null && leaderElector.isLeader()) {
leaderName = coreName;
break;
}
try {
- Replica leader = zkStateReader.getLeaderRetry(collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 10000), true);
+ Replica leader = zkStateReader.getLeaderRetry(collection, shardId, Integer.getInteger("solr.getleader.looptimeout", 5000));
leaderName = leader.getName();
+ boolean isLeader = leaderName.equals(coreName);
+
+ if (isLeader) {
+ if (leaderElector != null && leaderElector.isLeader()) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ } else {
+ boolean stop = true;
+ CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+ prepCmd.setCoreName(leader.getName());
+ prepCmd.setLeaderName(leader.getName());
+ prepCmd.setCollection(collection);
+ prepCmd.setShardId(shardId);
+
+ int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+
+ try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).withHttpClient(cc.getUpdateShardHandler().getTheSharedHttpClient()).markInternalRequest().build()) {
+
+ prepCmd.setBasePath(leader.getBaseUrl());
+
+ try {
+ NamedList<Object> result = client.request(prepCmd);
+ } catch (Exception e) {
+ log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+ stop = false;
+ }
+ }
+ if (stop) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
} catch (TimeoutException timeoutException) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
throw new AlreadyClosedException();
@@ -1332,7 +1362,7 @@ public class ZkController implements Closeable, Runnable {
}
if (leaderName == null) {
- log.error("No leader found while trying to register " + coreName + " with zookeeper");
+ log.error("No leader found while trying to register " + coreName + " with zookeeper collection={}", zkStateReader.getCollectionOrNull(collection));
throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
}
@@ -1502,7 +1532,7 @@ public class ZkController implements Closeable, Runnable {
}
- private void joinElection(CoreDescriptor cd, boolean joinAtHead) {
+ private LeaderElector joinElection(CoreDescriptor cd, boolean joinAtHead) {
log.info("joinElection {}", cd.getName());
// look for old context - if we find it, cancel it
String collection = cd.getCloudDescriptor().getCollectionName();
@@ -1530,7 +1560,7 @@ public class ZkController implements Closeable, Runnable {
LeaderElector leaderElector;
if (isDcCalled() || isClosed) {
- return;
+ return null;
}
leaderElector = leaderElectors.get(replica.getName());
if (leaderElector == null) {
@@ -1548,6 +1578,8 @@ public class ZkController implements Closeable, Runnable {
leaderElector.setup(context);
log.info("Joining election ...");
leaderElector.joinElection( false, joinAtHead);
+
+ return leaderElector;
}
@@ -1598,24 +1630,24 @@ public class ZkController implements Closeable, Runnable {
// props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
- try {
- if (core.getDirectoryFactory().isSharedStorage()) {
- // MRM TODO: currently doesn't publish anywhere
- if (core.getDirectoryFactory().isSharedStorage()) {
- props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
- props.put("dataDir", core.getDataDir());
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog != null) {
- props.put("ulogDir", ulog.getLogDir());
- }
- }
- }
- } catch (SolrCoreInitializationException ex) {
- // The core had failed to initialize (in a previous request, not this one), hence nothing to do here.
- if (log.isInfoEnabled()) {
- log.info("The core '{}' had failed to initialize before.", cd.getName());
- }
- }
+// try {
+// if (core.getDirectoryFactory().isSharedStorage()) {
+// // MRM TODO: currently doesn't publish anywhere
+// if (core.getDirectoryFactory().isSharedStorage()) {
+// props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
+// props.put("dataDir", core.getDataDir());
+// UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+// if (ulog != null) {
+// props.put("ulogDir", ulog.getLogDir());
+// }
+// }
+// }
+// } catch (SolrCoreInitializationException ex) {
+// // The core had failed to initialize (in a previous request, not this one), hence nothing to do here.
+// if (log.isInfoEnabled()) {
+// log.info("The core '{}' had failed to initialize before.", cd.getName());
+// }
+// }
// pull replicas are excluded because their terms are not considered
if ((state == Replica.State.RECOVERING || state == Replica.State.BUFFERING) && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
@@ -1855,6 +1887,14 @@ public class ZkController implements Closeable, Runnable {
}
}
+ public void clearStatePublisher() {
+ this.statePublisher.clearStatCache();
+ }
+
+ public void clearCachedState(String coreName) {
+ this.statePublisher.clearStatCache(coreName);
+ }
+
public int getClientTimeout() {
return clientTimeout;
}
@@ -2078,7 +2118,7 @@ public class ZkController implements Closeable, Runnable {
if (log.isDebugEnabled()) log.debug("No more listeners for config directory [{}]", confDir);
try {
zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index be522c6..9438f80 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -398,29 +398,30 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if (waitForFinalState && (createNodeSet == null || !createNodeSet.equals(ZkStateReader.CREATE_NODE_SET_EMPTY))) {
try {
zkStateReader.waitForState(collectionName, CREATE_COLLECTION_TIMEOUT, TimeUnit.SECONDS, (l, c) -> {
+ log.debug("notified cmd {}", c);
if (c == null) {
return false;
}
for (String name : coresToCreate.keySet()) {
- if (log.isTraceEnabled()) log.trace("look for core {}", name);
+ log.debug("look for core {} {} {} {}", name, c.getReplica(name), c.getReplica(name).getState(), c.getReplica(name).getState() != Replica.State.ACTIVE);
if (c.getReplica(name) == null || c.getReplica(name).getState() != Replica.State.ACTIVE) {
- if (log.isTraceEnabled()) log.trace("not the right replica or state {}", c.getReplica(name));
+ log.debug("not the right replica or state {}", c.getReplica(name));
return false;
}
}
Collection<Slice> slices = c.getSlices();
if (slices.size() < shardNames.size()) {
- if (log.isTraceEnabled()) log.trace("wrong number slices {} vs {}", slices.size(), shardNames.size());
+ log.debug("wrong number slices {} vs {}", slices.size(), shardNames.size());
return false;
}
for (Slice slice : slices) {
- if (log.isTraceEnabled()) log.trace("slice {} leader={}", slice, slice.getLeader());
+ log.debug("slice {} leader={}", slice, slice.getLeader());
if (slice.getLeader() == null || (slice.getLeader() != null && slice.getLeader().getState() != Replica.State.ACTIVE)) {
- if (log.isTraceEnabled()) log.trace("no leader found for slice {}", slice.getName());
+ log.debug("no leader found for slice {}", slice.getName());
return false;
}
}
- if (log.isTraceEnabled()) log.trace("return true, everything active");
+ log.debug("return true, everything active");
return true;
});
} catch (InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 9cb3980..14985f3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -116,6 +116,7 @@ public class DeleteReplicaCmd implements Cmd {
ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
ShardHandler finalShardHandler = shardHandler;
String finalCollectionName = collectionName;
+ String finalCollectionName2 = collectionName;
response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
@Override
public CollectionCmdResponse.Response call() {
@@ -134,6 +135,7 @@ public class DeleteReplicaCmd implements Cmd {
log.error("Exception waiting for delete replica response");
}
}
+ ocmh.overseer.getZkStateWriter().writePendingUpdates(finalCollectionName2);
Set<String> replicas = (Set<String>) resp.results.get("replicas_deleted");
for (String replica : replicas) {
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index c6f98da..5673a6d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -1245,7 +1245,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
this.closed = true;
try {
zkClient.removeWatches(watchPath, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index cd80b5d..5c98e6b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -36,12 +37,10 @@ import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
@@ -64,7 +63,7 @@ public class ZkStateWriter {
protected volatile Stats stats;
- private final Map<String, ZkNodeProps> stateUpdates = new ConcurrentHashMap<>();
+ private final Map<String, ConcurrentHashMap> stateUpdates = new ConcurrentHashMap<>();
Map<Long,String> idToCollection = new ConcurrentHashMap<>(128, 0.75f, 16);
@@ -90,7 +89,7 @@ public class ZkStateWriter {
}
- public void enqueueUpdate(DocCollection docCollection, Map<String,List<ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
+ public void enqueueUpdate(DocCollection docCollection, Map<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> collStateUpdates, boolean stateUpdate) throws Exception {
try {
@@ -175,8 +174,8 @@ public class ZkStateWriter {
collState.collLock.unlock();
}
} else {
- if (log.isDebugEnabled()) log.debug("enqueue state change states={}", collStateUpdates);
- for (Map.Entry<String,List<StateUpdate>> entry : collStateUpdates.entrySet()) {
+ log.trace("enqueue state change states={}", collStateUpdates);
+ for (Map.Entry<String,ConcurrentHashMap<String,ZkStateWriter.StateUpdate>> entry : collStateUpdates.entrySet()) {
ColState collState = collLocks.compute(entry.getKey(), (s, reentrantLock) -> {
if (reentrantLock == null) {
@@ -192,12 +191,14 @@ public class ZkStateWriter {
String collection = idToCollection.get(Long.parseLong(collectionId));
if (collection == null) {
log.error("Collection not found by id={} collections={}", collectionId, idToCollection);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
+
+ continue;
+ //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Collection not found by id=" + collectionId);
}
- ZkNodeProps updates = stateUpdates.get(collection);
+ ConcurrentHashMap updates = stateUpdates.get(collection);
if (updates == null) {
- updates = new ZkNodeProps();
+ updates = new ConcurrentHashMap();
stateUpdates.put(collection, updates);
}
@@ -205,46 +206,91 @@ public class ZkStateWriter {
String csVersion;
if (docColl != null) {
csVersion = Integer.toString(docColl.getZNodeVersion());
- for (StateUpdate state : entry.getValue()) {
+ for (StateUpdate state : entry.getValue().values()) {
if (state.sliceState != null) {
Slice slice = docColl.getSlice(state.sliceName);
if (slice != null) {
slice.setState(Slice.State.getState(state.sliceState));
+ slice.getProperties().put("state", state.sliceState);
}
dirtyStructure.add(collection);
continue;
}
Replica replica = docColl.getReplicaById(state.id);
- log.debug("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
+ log.trace("found existing collection name={}, look for replica={} found={}", collection, state.id, replica);
if (replica != null) {
- String setState = Replica.State.shortStateToState(state.state).toString();
- log.debug("zkwriter publish state={} replica={}", state.state, replica.getName());
- if (setState.equals("leader")) {
- if (log.isDebugEnabled()) {
- log.debug("set leader {}", replica);
- }
+
+ log.trace("zkwriter publish state={} replica={}", state.state, replica.getName());
+ if (state.state.equals("l")) {
+
+ log.trace("set leader {}", replica);
+
Slice slice = docColl.getSlice(replica.getSlice());
- slice.setLeader(replica);
- replica.setState(Replica.State.ACTIVE);
- replica.getProperties().put("leader", "true");
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica r : replicas) {
- if (r != replica) {
- r.getProperties().remove("leader");
+ Map<String,Replica> replicasMap = slice.getReplicasCopy();
+ Map properties = new HashMap(replica.getProperties());
+
+ properties.put("leader", "true");
+ properties.put("state", Replica.State.ACTIVE);
+ // properties.put(replica.getInternalId(), "l");
+ for (Replica r : replicasMap.values()) {
+ if (replica.getName().equals(r.getName())) {
+ continue;
+ }
+ log.trace("process non leader {} {}", r, r.getProperty(ZkStateReader.LEADER_PROP));
+ if ("true".equals(r.getProperties().get(ZkStateReader.LEADER_PROP))) {
+ log.debug("remove leader prop {}", r);
+ Map<String,Object> props = new HashMap<>(r.getProperties());
+ props.remove(ZkStateReader.LEADER_PROP);
+ Replica newReplica = new Replica(r.getName(), props, collection, docColl.getId(), r.getSlice(), overseer.getZkStateReader());
+ replicasMap.put(r.getName(), newReplica);
}
}
- updates.getProperties().put(replica.getInternalId(), "l");
+
+ Replica newReplica = new Replica(replica.getName(), properties, collection, docColl.getId(), replica.getSlice(), overseer.getZkStateReader());
+
+ replicasMap.put(replica.getName(), newReplica);
+
+ Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), collection, docColl.getId(), overseer.getZkStateReader());
+
+ Map<String,Slice> newSlices = docColl.getSlicesCopy();
+ newSlices.put(slice.getName(), newSlice);
+
+ log.trace("add new slice leader={} {} {}", newSlice.getLeader(), newSlice, docColl);
+
+ DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+ cs.put(collection, newDocCollection);
+ docColl = newDocCollection;
+ updates.put(replica.getInternalId(), "l");
dirtyState.add(collection);
} else {
+ String setState = Replica.State.shortStateToState(state.state).toString();
Replica.State s = Replica.State.getState(setState);
- Replica existingLeader = docColl.getSlice(replica).getLeader();
- if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
- docColl.getSlice(replica).setLeader(null);
- }
- updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
- log.debug("set state {} {}", state, replica);
- replica.setState(s);
+
+ log.trace("set state {} {}", state, replica);
+
+ Slice slice = docColl.getSlice(replica.getSlice());
+ Map<String,Replica> replicasMap = slice.getReplicasCopy();
+ Map properties = new HashMap(replica.getProperties());
+
+ properties.put("state", s);
+ properties.remove(ZkStateReader.LEADER_PROP);
+
+ Replica newReplica = new Replica(replica.getName(), properties, collection, docColl.getId(), replica.getSlice(), overseer.getZkStateReader());
+
+ replicasMap.put(replica.getName(), newReplica);
+
+ Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), collection, docColl.getId(), overseer.getZkStateReader());
+
+ Map<String,Slice> newSlices = docColl.getSlicesCopy();
+ newSlices.put(slice.getName(), newSlice);
+
+ log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+
+ DocCollection newDocCollection = new DocCollection(collection, newSlices, docColl.getProperties(), docColl.getRouter(), docColl.getZNodeVersion(), docColl.getStateUpdates());
+ cs.put(collection, newDocCollection);
+ docColl = newDocCollection;
+ updates.put(replica.getInternalId(), state.state);
dirtyState.add(collection);
}
} else {
@@ -252,24 +298,24 @@ public class ZkStateWriter {
}
}
} else {
- for (StateUpdate state : entry.getValue()) {
- log.warn("Could not find existing collection name={}", collection);
-// String setState = Replica.State.shortStateToState(state.state).toString();
-// if (setState.equals("leader")) {
-// updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), "l");
-// dirtyState.add(collection);
-// } else {
-// Replica.State s = Replica.State.getState(setState);
-// updates.getProperties().put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
-// dirtyState.add(collection);
-// }
+ for (StateUpdate state : entry.getValue().values()) {
+ log.debug("Could not find existing collection name={}", collection);
+ String setState = Replica.State.shortStateToState(state.state).toString();
+ if (setState.equals("l")) {
+ updates.put(state.id.substring(state.id.indexOf('-') + 1), "l");
+ dirtyState.add(collection);
+ } else {
+ Replica.State s = Replica.State.getState(setState);
+ updates.put(state.id.substring(state.id.indexOf('-') + 1), Replica.State.getShortState(s));
+ dirtyState.add(collection);
+ }
}
log.debug("version for state updates 0");
csVersion = "0";
}
if (dirtyState.contains(collection)) {
- updates.getProperties().put("_cs_ver_", csVersion);
+ updates.put("_cs_ver_", csVersion);
}
} finally {
@@ -373,8 +419,10 @@ public class ZkStateWriter {
Stat stat;
try {
+
stat = reader.getZkClient().setData(path, data, finalVersion, true, false);
collection.setZnodeVersion(finalVersion + 1);
+
if (log.isDebugEnabled()) log.debug("set new version {} {}", collection.getName(), stat.getVersion());
} catch (KeeperException.NoNodeException e) {
log.debug("No node found for state.json", e);
@@ -388,13 +436,14 @@ public class ZkStateWriter {
reader.getZkClient().setData(pathSCN, null, -1, true, false);
- ZkNodeProps updates = stateUpdates.get(collection.getName());
+ ConcurrentHashMap updates = stateUpdates.get(collection.getName());
if (updates != null) {
- updates.getProperties().clear();
+ updates.clear();
+ writeStateUpdates(collection, updates);
}
} else if (dirtyState.contains(collection.getName())) {
- ZkNodeProps updates = stateUpdates.get(collection.getName());
+ ConcurrentHashMap updates = stateUpdates.get(collection.getName());
if (updates != null) {
try {
writeStateUpdates(collection, updates);
@@ -423,9 +472,12 @@ public class ZkStateWriter {
}
}
- private void writeStateUpdates(DocCollection collection, ZkNodeProps updates) throws KeeperException, InterruptedException {
+ private void writeStateUpdates(DocCollection collection, ConcurrentHashMap updates) throws KeeperException, InterruptedException {
+ if (updates.size() == 0) {
+ return;
+ }
String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
- if (log.isDebugEnabled()) log.debug("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
+ log.trace("write state updates for collection {} ver={} {}", collection.getName(), updates.get("_cs_ver_"), updates);
try {
reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
} catch (KeeperException.NoNodeException e) {
@@ -449,36 +501,48 @@ public class ZkStateWriter {
public void removeCollection(String collection) {
- log.info("Removing collection from zk state {}", collection);
- ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
- if (reentrantLock == null) {
- ColState colState = new ColState();
- return colState;
- }
- return reentrantLock;
- });
- collState.collLock.lock();
+ log.debug("Removing collection from zk state {}", collection);
try {
- Long id = null;
- for (Map.Entry<Long, String> entry : idToCollection.entrySet()) {
- if (entry.getValue().equals(collection)) {
- id = entry.getKey();
- break;
+ ColState collState = collLocks.compute(collection, (s, reentrantLock) -> {
+ if (reentrantLock == null) {
+ ColState colState = new ColState();
+ return colState;
+ }
+ return reentrantLock;
+ });
+ collState.collLock.lock();
+ try {
+ Long id = null;
+ for (Map.Entry<Long,String> entry : idToCollection.entrySet()) {
+ if (entry.getValue().equals(collection)) {
+ id = entry.getKey();
+ break;
+ }
+ }
+ if (id != null) {
+ idToCollection.remove(id);
}
- }
- if (id != null) {
- idToCollection.remove(id);
stateUpdates.remove(collection);
+ DocCollection doc = cs.get(collection);
+
+ if (doc != null) {
+ List<Replica> replicas = doc.getReplicas();
+ for (Replica replica : replicas) {
+ overseer.getCoreContainer().getZkController().clearCachedState(replica.getName());
+ }
+ }
+
cs.remove(collection);
assignMap.remove(collection);
dirtyStructure.remove(collection);
dirtyState.remove(collection);
- cs.remove(collection);
+
+ } finally {
+ collState.collLock.unlock();
}
} catch (Exception e) {
- log.error("", e);
- } finally {
- collState.collLock.unlock();
+ log.error("Exception removing collection", e);
+
}
}
@@ -505,57 +569,61 @@ public class ZkStateWriter {
}
public void init() {
+ try {
+ overseer.getCoreContainer().getZkController().clearStatePublisher();
+ ClusterState readerState = reader.getClusterState();
+ if (readerState != null) {
+ reader.forciblyRefreshAllClusterStateSlow();
+ cs.putAll(readerState.copy().getCollectionsMap());
+ }
- ClusterState readerState = reader.getClusterState();
- if (readerState != null) {
- cs.putAll(readerState.copy().getCollectionsMap());
- }
-
- long[] highId = new long[1];
- cs.values().forEach(collection -> {
- String collectionName = collection.getName();
- ColState collState = collLocks.compute(collectionName, (s, colState) -> {
- if (colState == null) {
- ColState cState = new ColState();
- return cState;
- }
- return colState;
- });
- collState.collLock.lock();
- try {
-
- if (collection.getId() > highId[0]) {
- highId[0] = collection.getId();
- }
-
- idToCollection.put(collection.getId(), collection.getName());
-
+ long[] highId = new long[1];
+ cs.values().forEach(collection -> {
+ String collectionName = collection.getName();
+ ColState collState = collLocks.compute(collectionName, (s, colState) -> {
+ if (colState == null) {
+ ColState cState = new ColState();
+ return cState;
+ }
+ return colState;
+ });
+ collState.collLock.lock();
+ try {
- DocAssign docAssign = new DocAssign();
- docAssign.name = collection.getName();
- assignMap.put(docAssign.name, docAssign);
- int max = 1;
- Collection<Slice> slices = collection.getSlices();
- for (Slice slice : slices) {
- Collection<Replica> replicas = slice.getReplicas();
+ if (collection.getId() > highId[0]) {
+ highId[0] = collection.getId();
+ }
- for (Replica replica : replicas) {
- Matcher matcher = Assign.pattern.matcher(replica.getName());
- if (matcher.matches()) {
- int val = Integer.parseInt(matcher.group(1));
- max = Math.max(max, val);
+ idToCollection.put(collection.getId(), collection.getName());
+
+ DocAssign docAssign = new DocAssign();
+ docAssign.name = collection.getName();
+ assignMap.put(docAssign.name, docAssign);
+ int max = 1;
+ Collection<Slice> slices = collection.getSlices();
+ for (Slice slice : slices) {
+ Collection<Replica> replicas = slice.getReplicas();
+
+ for (Replica replica : replicas) {
+ Matcher matcher = Assign.pattern.matcher(replica.getName());
+ if (matcher.matches()) {
+ int val = Integer.parseInt(matcher.group(1));
+ max = Math.max(max, val);
+ }
}
}
+ docAssign.replicaAssignCnt.set(max);
+ } finally {
+ collState.collLock.unlock();
}
- docAssign.replicaAssignCnt.set(max);
- } finally {
- collState.collLock.unlock();
- }
- });
+ });
- ID.set(highId[0]);
+ ID.set(highId[0]);
- if (log.isDebugEnabled()) log.debug("zkStateWriter starting with cs {}", cs);
+ if (log.isDebugEnabled()) log.debug("zkStateWriter starting with cs {}", cs);
+ } catch (Exception e) {
+ log.error("Exception in ZkStateWriter init", e);
+ }
}
private static class DocAssign {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 6775b88..5bab87c 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -35,6 +35,7 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -139,7 +140,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -674,6 +674,68 @@ public class CoreContainer implements Closeable {
loaded = true;
+
+ List<CoreDescriptor> cds = coresLocator.discover(this);
+
+ checkForDuplicateCoreNames(cds);
+ status |= CORE_DISCOVERY_COMPLETE;
+
+ solrCores.load(loader);
+
+ if (isZooKeeperAware()) {
+ try {
+ zkSys.start(this);
+ } catch (IOException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ if (isZooKeeperAware()) {
+ try {
+ getZkController().publishNodeAs(getZkController().getNodeName(), OverseerAction.RECOVERYNODE);
+ } catch (Exception e) {
+ log.error("Failed publishing loading core as recovering", e);
+ }
+
+ List<CoreDescriptor> removeCds = new ArrayList<>();
+ for (final CoreDescriptor cd : cds) {
+
+ DocCollection docCollection = getZkController().getClusterState().getCollectionOrNull(cd.getCollectionName());
+ if (docCollection != null) {
+ Replica replica = docCollection.getReplica(cd.getName());
+ if (replica != null && !getZkController().getBaseUrl().equals(replica.getBaseUrl()) || Long.parseLong(cd.getCoreProperty("collId", "0")) != docCollection.getId()) {
+ removeCds.add(cd);
+ log.error("IllegalStateException wrong base url or collection id for this node in replica entry replica={}", replica);
+ try {
+ while (Files.exists(cd.getInstanceDir())) {
+ try {
+ Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).forEach(new FileConsumer());
+ } catch (NoSuchFileException | UncheckedIOException e) {
+
+ }
+ }
+ } catch (Exception e) {
+ SolrException.log(log, "Failed to delete instance dir for core:" + cd.getName() + " dir:" + cd.getInstanceDir());
+ }
+
+ }
+ }
+ markCoreAsLoading(cd.getName());
+ String collection = cd.getCollectionName();
+ getZkController().getZkStateReader().registerCore(collection, cd.getName());
+
+ }
+ for (CoreDescriptor removeCd : removeCds) {
+ cds.remove(removeCd);
+ }
+ } else {
+ for (final CoreDescriptor cd : cds) {
+ markCoreAsLoading(cd.getName());
+ }
+ }
+
// Always add $SOLR_HOME/lib to the shared resource loader
Set<String> libDirs = new LinkedHashSet<>();
libDirs.add("lib");
@@ -706,8 +768,6 @@ public class CoreContainer implements Closeable {
try {
- solrCores.load(loader);
-
logging = LogWatcher.newRegisteredLogWatcher(cfg.getLogWatcherConfig(), loader);
hostName = cfg.getNodeName();
@@ -720,16 +780,6 @@ public class CoreContainer implements Closeable {
createHandler(ZK_PATH, ZookeeperInfoHandler.class.getName(), ZookeeperInfoHandler.class);
createHandler(ZK_STATUS_PATH, ZookeeperStatusHandler.class.getName(), ZookeeperStatusHandler.class);
- if (isZooKeeperAware()) {
- try {
- zkSys.start(this);
- } catch (IOException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
-
try (ParWork work = new ParWork(this, false, false)) {
boolean enableMetrics = Boolean.parseBoolean(System.getProperty("solr.enableMetrics", "true"));
@@ -852,59 +902,14 @@ public class CoreContainer implements Closeable {
List<Future<SolrCore>> coreLoadFutures = null;
- List<CoreDescriptor> cds = coresLocator.discover(this);
+
coreLoadFutures = new ArrayList<>(cds.size());
if (isZooKeeperAware()) {
cds = CoreSorter.sortCores(this, cds);
}
- checkForDuplicateCoreNames(cds);
- status |= CORE_DISCOVERY_COMPLETE;
- startedLoadingCores = true;
-
- if (isZooKeeperAware()) {
-
- log.info("Waiting to see not ACTIVE states for node on startup ...");
- String nodeName = getZkController().getNodeName();
- for (final CoreDescriptor cd : cds) {
- String collection = cd.getCollectionName();
- getZkController().getZkStateReader().registerCore(collection, cd.getName());
- try {
- getZkController().getZkStateReader().waitForState(collection, 3, TimeUnit.SECONDS, (n, c) -> {
- if (c == null) {
- if (log.isDebugEnabled()) log.debug("Found incorrect state c={}", c);
- return false;
- }
-
-
- List<Replica> replicas = c.getReplicas();
- for (Replica replica : replicas) {
- log.trace("startup replica on node={} replica={}", zkSys.getZkController().getNodeName(), replica);
- if (replica.getNodeName().equals(nodeName)) {
- if (replica.getState().equals(State.ACTIVE)) {
- if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={} replica={}", replica.getState(), replica.getNodeName(), nodeName, replica);
- return false;
- }
- } else {
- // if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
- }
- }
- return true;
- });
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- return;
- } catch (TimeoutException e) {
- log.error("Timeout", e);
- }
- }
- }
+ startedLoadingCores = true;
- for (final CoreDescriptor cd : cds) {
- if (!cd.isTransient() && cd.isLoadOnStartup()) {
- solrCores.markCoreAsLoading(cd);
- }
- }
if (isZooKeeperAware()) {
zkSys.getZkController().createEphemeralLiveNode();
@@ -917,28 +922,29 @@ public class CoreContainer implements Closeable {
solrCores.addCoreDescriptor(cd);
}
- if (isZooKeeperAware()) {
- String collection = cd.getCollectionName();
-
- if (!zkSys.zkController.getClusterState().hasCollection(collection)) {
- solrCores.markCoreAsNotLoading(cd);
- try {
- coresLocator.delete(this, cd);
- } catch (Exception e) {
- log.error("Exception deleting core.properties file for non existing collection", e);
- }
-
- try {
- unload(cd, cd.getName(),true, true, true);
- } catch (Exception e) {
- log.error("Exception unloading core for non existing collection", e);
- }
- continue;
- }
- }
+ // MRM TODO: look at ids for this
+// if (isZooKeeperAware()) {
+// String collection = cd.getCollectionName();
+//
+// if (!zkSys.zkController.getClusterState().hasCollection(collection)) {
+// solrCores.markCoreAsNotLoading(cd);
+// try {
+// coresLocator.delete(this, cd);
+// } catch (Exception e) {
+// log.error("Exception deleting core.properties file for non existing collection", e);
+// }
+//
+// try {
+// unload(cd, cd.getName(),true, true, true);
+// } catch (Exception e) {
+// log.error("Exception unloading core for non existing collection", e);
+// }
+// continue;
+// }
+// }
if (cd.isLoadOnStartup()) {
-
+ startedLoadingCores = true;
coreLoadFutures.add(solrCoreExecutor.submit(() -> {
SolrCore core = null;
MDCLoggingContext.setCoreName(cd.getName());
@@ -1351,16 +1357,19 @@ public class CoreContainer implements Closeable {
ParWork.propagateInterrupt(ex);
// First clean up any core descriptor, there should never be an existing core.properties file for any core that
// failed to be created on-the-fly.
- coresLocator.delete(this, cd);
- if (isZooKeeperAware() && !preExisitingZkEntry) {
- try {
- getZkController().unregister(coreName, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
- } catch (Exception e) {
- log.error("", e);
+ try {
+ coresLocator.delete(this, cd);
+ if (isZooKeeperAware() && !preExisitingZkEntry && zkSys.getZkController().getZkClient().isAlive()) {
+ try {
+ getZkController().unregister(coreName, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+ } finally {
+ if (core != null) {
+ core.doClose();
}
- }
- if (core != null) {
- core.closeAndWait();
}
Throwable tc = ex;
@@ -1439,36 +1448,27 @@ public class CoreContainer implements Closeable {
timeValidateCoreNameLoadConfigSet.done();
try {
+ core = new SolrCore(this, dcore, coreConfig);
+ } catch (Exception e) {
+ core = processCoreCreateException(e, dcore, coreConfig);
+ }
- try {
- core = new SolrCore(this, dcore, coreConfig);
- } catch (Exception e) {
- core = processCoreCreateException(e, dcore, coreConfig);
- }
-
- core.start();
+ core.start();
- StopWatch timeRegisterCore = new StopWatch(dcore.getName() + "-registerCore");
- old = registerCore(dcore, core, true);
- registered = true;
- timeRegisterCore.done();
+ StopWatch timeRegisterCore = new StopWatch(dcore.getName() + "-registerCore");
+ registerCore(dcore, core, true);
+ registered = true;
+ timeRegisterCore.done();
- if (isZooKeeperAware()) {
- StopWatch timeKickOffAsyncZkReg = new StopWatch(dcore.getName() + "-kickOffAsyncZkReg");
- if (!newCollection) {
- if (core.getDirectoryFactory().isSharedStorage()) {
- zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
- }
+ if (isZooKeeperAware()) {
+ StopWatch timeKickOffAsyncZkReg = new StopWatch(dcore.getName() + "-kickOffAsyncZkReg");
+ if (!newCollection) {
+ if (core.getDirectoryFactory().isSharedStorage()) {
+ zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
}
- ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
- timeKickOffAsyncZkReg.done();
}
-
- } catch (Exception e) {
-
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } finally {
- solrCores.markCoreAsNotLoading(dcore);
+ ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
+ timeKickOffAsyncZkReg.done();
}
// always kick off recovery if we are in non-Cloud mode
@@ -1480,6 +1480,7 @@ public class CoreContainer implements Closeable {
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.error("Unable to create SolrCore", e);
+ solrCores.markCoreAsNotLoading(dcore);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
if (e instanceof ZkController.NotInClusterStateException && !newCollection) {
// this mostly happen when the core is deleted when this node is down
@@ -1506,31 +1507,10 @@ public class CoreContainer implements Closeable {
SolrCore finalCore1 = core;
try {
solrCoreExecutor.submit(() -> {
- finalCore1.closeAndWait();
- });
- } catch (RejectedExecutionException e) {
- finalCore1.closeAndWait();
- }
- SolrCore finalOld = old;
- if (finalOld != null) {
- try {
- solrCoreExecutor.submit(() -> {
- finalOld.closeAndWait();
- });
- } catch (RejectedExecutionException e) {
- finalOld.closeAndWait();
- }
- }
-
- }
- if (isShutDown) {
- SolrCore finalCore1 = core;
- try {
- solrCoreExecutor.submit(() -> {
- finalCore1.closeAndWait();
+ finalCore1.doClose();
});
} catch (RejectedExecutionException e) {
- finalCore1.closeAndWait();
+ finalCore1.doClose();
}
}
}
@@ -1968,6 +1948,7 @@ public class CoreContainer implements Closeable {
try {
if (isZooKeeperAware()) {
+ getZkController().clearCachedState(name);
if (cd != null) {
try {
zkSys.getZkController().unregister(name, cd.getCollectionName(), cd.getCloudDescriptor().getShardId());
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index f380a0f..34f028d 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -749,13 +749,13 @@ public final class SolrCore implements SolrInfoBean, Closeable {
coreContainer.solrCoreExecutor.submit(() -> {
try {
log.warn("Closing failed SolrCore from failed reload");
- finalCore.closeAndWait();
+ finalCore.close();
} catch (Exception e) {
log.error("Exception waiting for core to close on reload failure", e);
}
});
} catch (RejectedExecutionException e) {
- finalCore.closeAndWait();
+ finalCore.close();
}
}
}
@@ -2479,6 +2479,9 @@ public final class SolrCore implements SolrInfoBean, Closeable {
tmp = new SolrIndexSearcher(this, newIndexDir, getLatestSchema(), (realtime ? "realtime" : "main"), newReader, true, !realtime, true,
directoryFactory);
} else {
+ if (coreContainer.isShutDown() || closing) {
+ throw new AlreadyClosedException();
+ }
RefCounted<IndexWriter> writer = getSolrCoreState().getIndexWriter(this);
DirectoryReader newReader = null;
try {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 78a20d5..ae29756 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -86,7 +86,8 @@ class SolrCores implements Closeable {
}
public void load(SolrResourceLoader loader) {
- transientCoreCache = TransientSolrCoreCacheFactory.newInstance(loader, container);
+ // TODO
+ // transientCoreCache = TransientSolrCoreCacheFactory.newInstance(loader, container);
}
// We are shutting down. You can't hold the lock on the various lists of cores while they shut down, so we need to
@@ -462,8 +463,8 @@ class SolrCores implements Closeable {
public TransientSolrCoreCache getTransientCacheHandler() {
if (transientCoreCache == null) {
- log.error("No transient handler has been defined. Check solr.xml to see if an attempt to provide a custom {}"
- , "TransientSolrCoreCacheFactory was done incorrectly since the default should have been used otherwise.");
+// log.error("No transient handler has been defined. Check solr.xml to see if an attempt to provide a custom {}"
+// , "TransientSolrCoreCacheFactory was done incorrectly since the default should have been used otherwise.");
return null;
}
return transientCoreCache.getTransientSolrCoreCache();
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 44baa39..8b388cc 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -731,7 +731,7 @@ public class IndexFetcher {
ZkController zkController = solrCore.getCoreContainer().getZkController();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- cd.getCollectionName(), cd.getShardId(), 3000, true);
+ cd.getCollectionName(), cd.getShardId(), 5000);
return leaderReplica;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 8f925c4..260e8e0 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
@@ -73,7 +74,7 @@ public class ColStatus {
}
@SuppressWarnings({"unchecked"})
- public void getColStatus(NamedList<Object> results) {
+ public void getColStatus(NamedList<Object> results) throws TimeoutException, InterruptedException {
Collection<String> collections;
String col = props.getStr(ZkStateReader.COLLECTION_PROP);
if (col == null) {
@@ -155,7 +156,7 @@ public class ColStatus {
sliceMap.add("routingRules", rules);
}
sliceMap.add("replicas", replicaMap);
- Replica leader = zkStateReader.getLeader(collection, s.getName());
+ Replica leader = zkStateReader.getLeaderRetry(collection, s.getName(), 10000);
if (leader == null) { // pick the first one
leader = s.getReplicas().size() > 0 ? s.getReplicas().iterator().next() : null;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 5f9f90e..bbf9cef 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -394,6 +394,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
} else {
if (log.isDebugEnabled()) log.debug("no data in response, checking for timeout");
if (System.nanoTime() - time >= TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) {
+ if (log.isDebugEnabled()) {
+ coreContainer.getZkController().getZkClient().printLayout();
+ }
throw new SolrException(ErrorCode.SERVER_ERROR, operation
+ " the collection time out:" + timeout / 1000 + "s " + m);
} else if (event.getWatchedEvent() != null) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 8f17127..dee1675 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,9 +18,7 @@
package org.apache.solr.handler.admin;
import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.common.ParWork;
-import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
@@ -53,12 +51,28 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
String shard = params.get(ZkStateReader.SHARD_ID_PROP);
- Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
+ String state = params.get(ZkStateReader.STATE_PROP);
+
+ Replica.State waitForState = null;
+ if (state != null) {
+ waitForState = Replica.State.getState(state);
+ }
log.info(
"Going to wait for core: {}, state: {}: params={}",
cname, waitForState, params);
+ LeaderElector leaderElector = it.handler.coreContainer.getZkController().getLeaderElector(leaderName);
+ if (leaderElector == null || !leaderElector.isLeader()) {
+ throw new IllegalStateException("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+ " coll=" + collection);
+ }
+
+ if (waitForState == null) {
+ log.info("Done checking leader:", cname);
+ return;
+ }
+
assert TestInjection.injectPrepRecoveryOpPauseForever();
CoreContainer coreContainer = it.handler.coreContainer;
@@ -66,6 +80,8 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
AtomicReference<String> errorMessage = new AtomicReference<>();
try {
+ Replica.State finalWaitForState = waitForState;
+ Replica.State finalWaitForState1 = waitForState;
coreContainer.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
if (c == null) {
return false;
@@ -81,9 +97,9 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
isLive = coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName());
if (isLive) {
- if (replica.getState() == waitForState) {
+ if (replica.getState() == finalWaitForState) {
if (log.isDebugEnabled()) {
- log.debug("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), waitForState,
+ log.debug("replica={} state={} waitForState={} isLive={}", replica, replica.getState(), finalWaitForState,
coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName()));
}
return true;
@@ -93,24 +109,15 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
}
errorMessage.set(
- "Timeout waiting to see " + waitForState + " state replica=" + cname + " state=" + (replica == null ? "(null replica)" : replica.getState())
- + " waitForState=" + waitForState + " isLive=" + isLive + "\n" + coreContainer.getZkController().getZkStateReader().getClusterState()
- .getCollectionOrNull(collection));
+ "Timeout waiting to see " + finalWaitForState + " state replica=" + cname + " state=" + (replica == null ? "(null replica)" : replica.getState())
+ + " waitForState=" + finalWaitForState1 + " isLive=" + isLive);
return false;
});
} catch (TimeoutException | InterruptedException e) {
ParWork.propagateInterrupt(e);
String error = errorMessage.get();
- if (error == null)
- error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
- throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
+ log.error(error);
}
-
-// LeaderElector leaderElector = it.handler.coreContainer.getZkController().getLeaderElector(leaderName);
-// if (leaderElector == null || !leaderElector.isLeader()) {
-// throw new IllegalStateException("Not the valid leader (replica=" + leaderName + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
-// " coll=" + it.handler.getCoreContainer().getZkController().getClusterState().getCollectionOrNull(collection));
-// }
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 558f67e..d5152ca 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -49,6 +49,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentBase;
@@ -140,7 +141,14 @@ public class RealTimeGetComponent extends SearchComponent
}
String onlyIfLeader = params.get("onlyIfLeader");
- // MRM TODO:
+
+ if (req.getCore().getCoreContainer().isZooKeeperAware() && Boolean.parseBoolean(onlyIfLeader)) {
+ LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+ if (leaderElector == null || !leaderElector.isLeader()) {
+ throw new IllegalStateException("Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) +
+ " coll=" + req.getCore().getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
+ }
+ }
String val = params.get("getFingerprint");
if(val != null) {
diff --git a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
index f4cf0b8..3e99fa0 100644
--- a/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
+++ b/solr/core/src/java/org/apache/solr/rest/ManagedResourceStorage.java
@@ -149,7 +149,7 @@ public abstract class ManagedResourceStorage {
dir.mkdirs();
storageDir = dir.getAbsolutePath();
- log.info("File-based storage initialized to use dir: {}", storageDir);
+ log.debug("File-based storage initialized to use dir: {}", storageDir);
}
@Override
@@ -332,7 +332,7 @@ public abstract class ManagedResourceStorage {
byte[] znodeData = toByteArray();
try {
zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
- log.info("Wrote {} bytes to new znode {}", znodeData.length, znodePath);
+ log.debug("Wrote {} bytes to new znode {}", znodeData.length, znodePath);
} catch (KeeperException.NodeExistsException e) {
try {
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index e1cb9e8..15912da 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -18,6 +18,7 @@ package org.apache.solr.schema;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.IOUtils;
@@ -106,7 +107,7 @@ public class ZkIndexSchemaReader implements OnReconnect, Closeable {
public void close() throws IOException {
try {
schemaReader.zkClient.removeWatches(schemaReader.managedSchemaPath, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
if (log.isDebugEnabled()) {
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index a83165b..c9e803a 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -260,6 +260,7 @@ public class HttpSolrCall {
core = cores.getCore(origCorename);
if (core == null && cores.isCoreLoading(origCorename)) {
+ log.debug("core is loading, will wait a bit");
cores.waitForLoadingCore(origCorename, 5000);
core = cores.getCore(origCorename);
}
@@ -349,10 +350,10 @@ public class HttpSolrCall {
solrReq = parser.parse(core, path, req);
}
- invalidStates = checkStateVersionsAreValid(queryParams.get(CloudSolrClient.STATE_VERSION));
- ensureStatesAreAtLeastAtClient();
+ invalidStates = checkStateVersionsAreValid(getCollectionsList(), queryParams.get(CloudSolrClient.STATE_VERSION));
+ ensureStatesAreAtLeastAtClient();
addCollectionParamIfNeeded(getCollectionsList());
action = PROCESS;
@@ -494,17 +495,16 @@ public class HttpSolrCall {
protected void extractRemotePath(String collectionName) throws UnsupportedEncodingException, KeeperException, InterruptedException, SolrException, TimeoutException {
- ensureStatesAreAtLeastAtClient();
coreUrl = getRemoteCoreUrl(collectionName);
// don't proxy for internal update requests
- Map<String,Integer> invalidStates = checkStateVersionsAreValid(queryParams.get(CloudSolrClient.STATE_VERSION));
+// Map<String,Integer> invalidStates = checkStateVersionsAreValid(getCollectionsList(), queryParams.get(CloudSolrClient.STATE_VERSION));
if (coreUrl != null
&& queryParams.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
- if (invalidStates != null) {
- //it does not make sense to send the request to a remote node
- throw new SolrException(SolrException.ErrorCode.INVALID_STATE, new String(Utils.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
- }
+// if (invalidStates != null) {
+// //it does not make sense to send the request to a remote node
+// throw new SolrException(SolrException.ErrorCode.INVALID_STATE, new String(Utils.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
+// }
action = REMOTEQUERY;
} else {
if (!retry) {
@@ -1033,8 +1033,20 @@ public class HttpSolrCall {
}
/** Returns null if the state ({@link CloudSolrClient#STATE_VERSION}) is good; otherwise returns state problems. */
- private Map<String, Integer> checkStateVersionsAreValid(String stateVer) {
+ private Map<String, Integer> checkStateVersionsAreValid(List<String> collectionsList, String stateVer) {
// TODO: for collections that are local and watched, we should just wait for the right min state, not eager fetch everything
+// Set<String> colList = cores.getZkController().getZkStateReader().getClusterState().getCollectionsMap().keySet();
+// if ((stateVer == null || stateVer.isEmpty()) && cores.isZooKeeperAware()) {
+// StringBuilder sb = new StringBuilder();
+// for (String collection : colList) {
+// if (sb.length() > 0) {
+// sb.append("|");
+// }
+// sb.append(collection + ":0>0");
+// }
+// stateVer = sb.toString();
+// }
+
Map<String, Integer> result = null;
String[] pairs;
if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
@@ -1043,7 +1055,12 @@ public class HttpSolrCall {
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
- Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1]));
+
+ String[] versionAndUpdatesHash = pcs[1].split(">");
+ int version = Integer.parseInt(versionAndUpdatesHash[0]);
+ int updateHash = Integer.parseInt(versionAndUpdatesHash[1]);
+
+ Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], version, updateHash);
if (status != null) {
if (result == null) result = new HashMap<>();
result.put(pcs[0], status);
@@ -1083,8 +1100,6 @@ public class HttpSolrCall {
protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) throws TimeoutException, InterruptedException {
log.debug("get core by collection {} {}", collectionName, isPreferLeader);
- ensureStatesAreAtLeastAtClient();
-
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
@@ -1095,6 +1110,12 @@ public class HttpSolrCall {
return null;
}
+ try {
+ zkStateReader.waitForActiveCollection(collectionName, 10000, TimeUnit.MILLISECONDS, true, collection.getSlices().size(), collection.getReplicas().size(), false);
+ } catch (Exception e) {
+ log.warn("Did not find leaders for collection:" + collection.getName());
+ }
+
if (isPreferLeader) {
List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
log.debug("preferLeader leaderReplicas={}", leaderReplicas);
@@ -1114,7 +1135,7 @@ public class HttpSolrCall {
RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
while (it.hasNext()) {
Replica replica = it.next();
- if (!checkActive || (liveNodes.contains(replica.getNodeName()))) {
+ if (liveNodes.contains(replica.getNodeName())) {
SolrCore core = checkProps(replica);
if (core != null && checkActive && replica.getState() != Replica.State.ACTIVE) {
try {
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index 27896e1..60131a5 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -406,27 +406,24 @@ public class SolrDispatchFilter extends BaseSolrFilter {
public void close() {
CoreContainer cc = cores;
-
try {
- if (metricManager != null) {
- try {
- metricManager.unregisterGauges(registryName, metricTag);
- } catch (NullPointerException e) {
- // okay
- } catch (Exception e) {
- log.warn("Exception closing FileCleaningTracker", e);
- } finally {
- metricManager = null;
- }
- }
+// if (metricManager != null) {
+// try {
+// metricManager.unregisterGauges(registryName, metricTag);
+// } catch (NullPointerException e) {
+// // okay
+// } catch (Exception e) {
+// log.warn("Exception closing FileCleaningTracker", e);
+// } finally {
+// metricManager = null;
+// }
+// }
} finally {
- // if (!cc.isShutDown()) {
+ if (!cc.isShutDown()) {
log.info("CoreContainer is not yet shutdown during filter destroy, shutting down now {}", cc);
GlobalTracer.get().close();
stopCoreContainer(cc);
- // }
-
-
+ }
// if (SolrLifcycleListener.isRegisteredStopped(stopRunnable)) {
// SolrLifcycleListener.removeStopped(stopRunnable);
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index a1c8249..1bfe076 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* Used for distributing commands from a shard leader to its replicas.
*/
public class SolrCmdDistributor implements Closeable {
- private static final int MAX_RETRIES_ON_FORWARD = 2;
+ private static final int MAX_RETRIES_ON_FORWARD = 10;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ConnectionManager.IsClosed isClosed;
private final ZkStateReader zkStateReader;
@@ -136,6 +136,11 @@ public class SolrCmdDistributor implements Closeable {
}
if (err.req.retries < maxRetries && doRetry && !isClosed.isClosed()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+
+ }
err.req.retries++;
SolrException.log(SolrCmdDistributor.log, "sending update to "
@@ -457,7 +462,7 @@ public class SolrCmdDistributor implements Closeable {
public static class StdNode extends Node {
protected final ZkStateReader zkStateReader;
- protected Replica nodeProps;
+ protected volatile Replica nodeProps;
protected String collection;
protected String shardId;
private final boolean retry;
@@ -505,7 +510,7 @@ public class SolrCmdDistributor implements Closeable {
@Override
public boolean checkRetry() {
- return true;
+ return false;
}
@Override
@@ -579,6 +584,7 @@ public class SolrCmdDistributor implements Closeable {
@Override
public boolean checkRetry() {
+ log.debug("check retry");
Replica leaderProps;
try {
leaderProps = zkStateReader.getLeaderRetry(
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index a757c47..8de165c 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -231,7 +231,8 @@ public class SolrIndexWriter extends IndexWriter {
directoryFactory.release(getDirectory());
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating IndexWriter");
}
- assert ObjectReleaseTracker.track(this);
+ // TODO:
+ // assert ObjectReleaseTracker.track(this);
}
public static Directory getDir(DirectoryFactory directoryFactory, String path, SolrIndexConfig config) {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index b9c6461..5eb07c9 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1913,7 +1913,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
- protected RecoveryInfo recoveryInfo;
+ protected volatile RecoveryInfo recoveryInfo;
class LogReplayer implements Runnable {
private Logger loglog = log; // set to something different?
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 1a2d28a..01a7572 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -466,7 +466,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.add(cmd);
- log.info("docid={} dropped because not active and buffering and not a replay update", cmd.getPrintableId());
+ log.debug("docid={} buffering update", cmd.getPrintableId());
if (SkyHook.skyHookDoc != null) {
SkyHook.skyHookDoc.register(cmd.getPrintableId(), "dropping update, non logic applied, but we are buffering - added to ulog only");
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index b5ce87c..e59b421 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.LeaderElector;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
@@ -164,7 +165,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
public void processCommit(CommitUpdateCommand cmd) throws IOException {
Replica leaderReplica;
try {
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId(), 10000);
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, desc.getCloudDescriptor().getShardId(), 1000);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
@@ -682,7 +683,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
+ "failed since we're not in cloud mode.");
}
try {
- return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 3000, true).getCoreUrl();
+ return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId(), 5000).getCoreUrl();
} catch (InterruptedException | TimeoutException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
@@ -773,10 +774,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
String shardId = slice.getName();
Replica leaderReplica;
try {
+
+ doDefensiveChecks(phase);
+
// Not equivalent to getLeaderProps, which retries to find a leader.
- // Replica leader = slice.getLeader();
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 5000);
- isLeader = leaderReplica.getName().equals(desc.getName());
+ leaderReplica = slice.getLeader();
+// leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 10000);
+ isLeader = leaderReplica != null && leaderReplica.getName().equals(desc.getName());
if (!isLeader) {
isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
@@ -785,8 +789,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
}
- doDefensiveChecks(phase);
-
// if request is coming from another collection then we want it to be sent to all replicas
// even if its phase is FROMLEADER
String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
@@ -958,7 +960,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
Replica myLeader = null;
try {
- myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 1500, true);
+ myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId, 5000);
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "error getting leader", e);
}
@@ -1152,6 +1154,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
DocCollection docCollection = clusterState.getCollection(collection);
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
+ if (DistribPhase.TOLEADER == phase) {
+ LeaderElector leaderElector = req.getCore().getCoreContainer().getZkController().getLeaderElector(req.getCore().getName());
+ if (leaderElector == null || !leaderElector.isLeader()) {
+ throw new IllegalStateException(
+ "Not the valid leader (replica=" + req.getCore().getName() + ")" + (leaderElector == null ? "No leader elector" : "Elector state=" + leaderElector.getState()) + " coll=" + req.getCore()
+ .getCoreContainer().getZkController().getClusterState().getCollectionOrNull(req.getCore().getCoreDescriptor().getCollectionName()));
+ }
+ }
+
if (DistribPhase.FROMLEADER == phase && isLeader && from != null) { // from will be null on log replay
String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
if (fromShard != null) {
diff --git a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
index 7400591..351ecec 100644
--- a/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
+++ b/solr/core/src/java/org/apache/solr/util/plugin/AbstractPluginLoader.java
@@ -149,7 +149,7 @@ public abstract class AbstractPluginLoader<T>
AtomicReference<T> defaultPlugin = new AtomicReference<>();
XPath xpath = loader.getXPath();
if (nodes !=null ) {
- for (int i=0; i<nodes.size(); i++) {
+ for (int i = 0; i < nodes.size(); i++) {
try (ParWork parWork = new ParWork(this, false, true)) {
NodeInfo node = nodes.get(i);
@@ -164,40 +164,39 @@ public abstract class AbstractPluginLoader<T>
}
String finalName = name;
- parWork.collect(name, ()->{
- try {
- T plugin = create(loader, finalName, className, node, xpath);
- if (log.isTraceEnabled()) {
- log.trace("created {}: {}", ((finalName != null) ? finalName : ""), plugin.getClass().getName());
- }
+ try {
+ T plugin = create(loader, finalName, className, node, xpath);
- // Either initialize now or wait till everything has been registered
- if (preRegister) {
- info.add(new PluginInitInfo(plugin, node));
- } else {
- init(plugin, node);
- }
+ if (log.isTraceEnabled()) {
+ log.trace("created {}: {}", ((finalName != null) ? finalName : ""), plugin.getClass().getName());
+ }
- T old = register(finalName, plugin);
- if (old != null && !(finalName == null && !requireName)) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple " + type + " registered to the same name: " + finalName + " ignoring: " + old);
- }
+ // Either initialize now or wait till everything has been registered
+ if (preRegister) {
+ info.add(new PluginInitInfo(plugin, node));
+ } else {
+ init(plugin, node);
+ }
- if (defaultStr != null && Boolean.parseBoolean(defaultStr)) {
- if (defaultPlugin.get() != null) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple default " + type + " plugins: " + defaultPlugin + " AND " + finalName);
- }
- defaultPlugin.set(plugin);
- }
- } catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- } else {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ T old = register(finalName, plugin);
+ if (old != null && !(finalName == null && !requireName)) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple " + type + " registered to the same name: " + finalName + " ignoring: " + old);
+ }
+
+ if (defaultStr != null && Boolean.parseBoolean(defaultStr)) {
+ if (defaultPlugin.get() != null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Multiple default " + type + " plugins: " + defaultPlugin + " AND " + finalName);
}
+ defaultPlugin.set(plugin);
}
- });
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
} catch (Exception ex) {
ParWork.propagateInterrupt(ex);
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index b94af95..45be93a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -89,7 +89,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
System.setProperty("solr.zkclienttimeout", "15000");
System.setProperty("zkClientTimeout", "15000");
-
+ System.setProperty("solr.getleader.looptimeout", "10000");
String timeout = "640000";
System.setProperty("solr.http2solrclient.default.idletimeout", timeout);
System.setProperty("distribUpdateSoTimeout", timeout);
@@ -438,6 +438,18 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
// Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
// assertEquals(1, coresStatus.size());
+ cluster.getSolrClient().getZkStateReader().waitForState(collectionName, 5000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ for (Replica replica : collectionState.getReplicas()) {
+ if (replica.getState() != Replica.State.ACTIVE) {
+ return false;
+ }
+ }
+ return true;
+ });
+
DocCollection testCollection = getCollectionState(collectionName);
Replica replica1 = testCollection.getReplicas().iterator().next();
@@ -555,6 +567,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
@LuceneTestCase.Nightly
public void testColStatus() throws Exception {
final String collectionName = "collectionStatusTest";
+
CollectionAdminRequest.createCollection(collectionName, "conf2", 2, 2)
.setMaxShardsPerNode(100)
.waitForFinalState(true)
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 5e7a97e..4a506ed 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -101,6 +101,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
Create req = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2).waitForFinalState(true);
req.process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collectionName, 2, 4);
+
DocCollection state = getCollectionState(collectionName);
Slice shard = getRandomShard(state);
@@ -151,7 +153,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
final String collectionName = "deletereplica_test";
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2).waitForFinalState(true).process(cluster.getSolrClient());
- Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1");
+ Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000, true);
//Confirm that the instance and data directory exist
CoreStatus coreStatus = getCoreStatus(leader);
@@ -162,9 +164,9 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
req.setWaitForFinalState(true);
req.process(cluster.getSolrClient());
- Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 2000);
-
+ log.info("leader was {}", leader);
+ Replica newLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000, true);
org.apache.solr.common.util.TimeOut timeOut = new org.apache.solr.common.util.TimeOut(2000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
@@ -398,7 +400,9 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
int finalI = i;
threads[i] = new Thread(() -> {
int doc = finalI * (TEST_NIGHTLY ? 10000 : 100);
+ int cnt = 0;
while (!closed.get()) {
+ cnt++;
try {
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(doc++)));
} catch (AlreadyClosedException e) {
@@ -407,15 +411,18 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
} catch (Exception e) {
log.error("Failed on adding document to {}", collectionName, e);
}
+ // TODO: why did this not stop anymore? Need to write out state sooner?oip9
+ if (cnt > 10000) {
+ break;
+ }
}
});
futures.add(ParWork.getRootSharedExecutor().submit(threads[i]));
}
-
-
+ Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collectionName, "s1", 5000);
Slice shard1 = getCollectionState(collectionName).getSlice("s1");
- Replica nonLeader = shard1.getReplicas(rep -> !rep.getName().equals(shard1.getLeader().getName())).get(0);
+ Replica nonLeader = shard1.getReplicas(rep -> !rep.getName().equals(leader.getName())).get(0);
CollectionAdminRequest.DeleteReplica req = CollectionAdminRequest.deleteReplica(collectionName, "s1", nonLeader.getName());
req.setWaitForFinalState(true);
req.process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index 95e9bb7..7740fa2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -137,6 +137,8 @@ public class DeleteShardTest extends SolrCloudTestCase {
.waitForFinalState(true)
.process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collection, 3, 3);
+
// Get replica details
Replica leader = getCollectionState(collection).getLeader("a");
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 8c8a05e..b39cdf3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -526,7 +526,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
final String shardName = entry.getKey();
final Slice slice = entry.getValue();
log.info("Checking: {} -> {}", shardName, slice);
- final Replica leader = entry.getValue().getLeader();
+ final Replica leader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(collection.getName(), shardName, 5000);
try (Http2SolrClient leaderClient = SolrTestCaseJ4.getHttpSolrClient(leader.getCoreUrl())) {
final SolrDocumentList leaderResults = leaderClient.query(perReplicaParams).getResults();
log.debug("Shard {}: Leader results: {}", shardName, leaderResults);
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
index af02db3..b447534 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
@@ -98,8 +98,6 @@ public class RecoveryZkTest extends SolrCloudTestCase {
cluster.getSolrClient().setDefaultCollection(collection);
- cluster.waitForActiveCollection(collection, 1, 2);
-
// start a couple indexing threads
int[] maxDocList = new int[] {25, 55};
@@ -156,15 +154,27 @@ public class RecoveryZkTest extends SolrCloudTestCase {
cluster.waitForActiveCollection(collection, 1, 2);
- new UpdateRequest()
- .commit(cluster.getSolrClient(), collection);
-
- cluster.waitForActiveCollection(collection, 1, 2);
-
// test that leader and replica have same doc count
- state = getCollectionState(collection);
- assertShardConsistency(state.getSlice("s1"), true);
+ int cnt = 0;
+ while (true) {
+ try {
+ new UpdateRequest().commit(cluster.getSolrClient(), collection);
+ } catch (Exception e) {
+ log.info("commit fail", e);
+ }
+
+ try {
+ state = getCollectionState(collection);
+ assertShardConsistency(state.getSlice("s1"), true);
+ break;
+ } catch (AssertionError error) {
+ if (cnt++ > 5) {
+ throw error;
+ }
+ }
+ Thread.sleep(500);
+ }
}
private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
index 335e0cd..f1db9fb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
@@ -440,7 +440,7 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
leader = tmp;
break;
}
- Thread.sleep(50);
+ Thread.sleep(250);
}
assertNotNull("Could not find active leader for " + shardId + " of " +
testCollectionName + " after "+timeoutSecs+" secs;", leader);
diff --git a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
index e341bae..2a1d35a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SyncSliceTest.java
@@ -23,7 +23,6 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
@@ -147,9 +146,9 @@ public class SyncSliceTest extends SolrCloudBridgeTestCase {
// updateMappingsFromZk(this.jettys, this.clients);
leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(COLLECTION, "s1", 5)));
if (deadJetty == leaderJetty) {
- Thread.sleep(100);
+ Thread.sleep(500);
}
- if (cnt++ >= 3) {
+ if (cnt++ >= 30) {
fail("don't expect leader to be on the jetty we stopped deadJetty=" + deadJetty.getNodeName() + " leaderJetty=" + leaderJetty.getNodeName());
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
index 0037f1a..bad1713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudDeleteByQuery.java
@@ -130,6 +130,8 @@ public class TestCloudDeleteByQuery extends SolrCloudTestCase {
.setProperties(collectionProperties)
.process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(COLLECTION_NAME, NUM_SHARDS, NUM_SHARDS * REPLICATION_FACTOR);
+
CLOUD_CLIENT = cluster.getSolrClient();
CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
index 5b7e436..9f04280 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -37,11 +38,13 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -62,6 +65,9 @@ public class TestCloudRecovery extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
+
+ System.setProperty("solr.getleader.looptimeout", "10000");
+ System.setProperty("solr.recovery.maxretries", "5");
System.setProperty("solr.enableMetrics", "true");
System.setProperty("solr.disableDefaultJmxReporter", "false");
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
@@ -73,7 +79,7 @@ public class TestCloudRecovery extends SolrCloudTestCase {
public void setUp() throws Exception {
super.setUp();
useFactory(null);
- configureCluster(2)
+ configureCluster(4)
.addConfig("config", SolrTestUtil.TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
@@ -137,6 +143,14 @@ public class TestCloudRecovery extends SolrCloudTestCase {
cluster.waitForActiveCollection(COLLECTION, 2, 2 * (nrtReplicas + tlogReplicas));
+ TimeOut timeout = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut()) {
+ resp = cloudClient.query(COLLECTION, params);
+ if (resp.getResults().getNumFound() >= 4) {
+ break;
+ }
+ }
+
resp = cloudClient.query(COLLECTION, params);
assertEquals(4, resp.getResults().getNumFound());
// Make sure all leader nodes recover from tlog
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index a15fd89..2e6678e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -71,6 +71,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
node2.stop();
cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
+ cluster.waitForActiveCollection(COLLECTION, 1, 1, true);
UpdateRequest req = new UpdateRequest();
for (int i = 0; i < 100; i++) {
@@ -154,7 +155,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
v = client1.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
assertEquals("30", v.toString());
}
- Replica oldLeader = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION).getLeader("s1");
+ Replica oldLeader = cluster.getSolrClient().getZkStateReader().getLeaderRetry(COLLECTION,"s1");
node1.stop();
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index e42bada..deed435 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -122,6 +122,7 @@ public class TestPullReplica extends SolrCloudTestCase {
CollectionAdminRequest
.createCollection(collectionName, "conf", 2, 1, 0, 3).waitForFinalState(true)
.process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collectionName, 2, 8);
break;
case 1:
// Sometimes use v1 API
@@ -132,6 +133,7 @@ public class TestPullReplica extends SolrCloudTestCase {
3); // pullReplicas);
url = url + SolrTestCaseUtil.pickRandom("", "&nrtReplicas=1", "&replicationFactor=1"); // These options should all mean the same
Http2SolrClient.GET(url, cluster.getSolrClient().getHttpClient());
+ cluster.waitForActiveCollection(collectionName, 2, 8);
break;
case 2:
// Sometimes use V2 API
@@ -146,6 +148,7 @@ public class TestPullReplica extends SolrCloudTestCase {
.POST(url, cluster.getSolrClient().getHttpClient(),
requestBody.getBytes("UTF-8"), "application/json");
assertEquals(200, response.status);
+ cluster.waitForActiveCollection(collectionName, 2, 8);
break;
}
boolean reloaded = false;
@@ -213,7 +216,7 @@ public class TestPullReplica extends SolrCloudTestCase {
int numPullReplicas = 1 + random().nextInt(3);
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numPullReplicas).waitForFinalState(true)
.process(cluster.getSolrClient());
- waitForState("Expected collection to be created with 1 shard and " + (numPullReplicas + 1) + " replicas", collectionName, clusterShape(1, numPullReplicas + 1));
+ cluster.waitForActiveCollection(collectionName, 1, numPullReplicas + 1);
DocCollection docCollection = assertNumberOfReplicas(1, 0, numPullReplicas, false, true);
assertEquals(1, docCollection.getSlices().size());
@@ -524,7 +527,7 @@ public class TestPullReplica extends SolrCloudTestCase {
}
private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query) throws IOException, SolrServerException, InterruptedException {
- TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ TimeOut t = new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME);
for (Replica r : replicas) {
if (cluster.getSolrClient().getZkStateReader().isNodeLive(r.getNodeName())) {
try (Http2SolrClient replicaClient = SolrTestCaseJ4.getHttpSolrClient(r.getCoreUrl())) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
index e1a673b..6ac8c58 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
@@ -83,7 +83,7 @@ public class TestSegmentSorting extends SolrCloudTestCase {
.setProperties(collectionProperties);
assertTrue( cmd.process(cloudSolrClient).isSuccess() );
-
+ cluster.waitForActiveCollection(collectionName, NUM_SHARDS, NUM_SHARDS * (TEST_NIGHTLY ? REPLICATION_FACTOR : 1));
cloudSolrClient.setDefaultCollection(collectionName);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
index 438b9a3..2a65bb4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java
@@ -56,7 +56,7 @@ public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
(1, SolrTestUtil.createTempDir(), buildJettyConfig("/solr"));
try {
log.info("Create our collection");
- CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).waitForFinalState(true).process(cluster.getSolrClient());
log.info("Shutdown 1 node");
final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
index 8344c8d..0f679fd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionReloadTest.java
@@ -52,6 +52,7 @@ public class CollectionReloadTest extends SolrCloudTestCase {
final String testCollectionName = "c8n_1x1";
CollectionAdminRequest.createCollection(testCollectionName, "conf", 1, 1)
+ .waitForFinalState(true)
.process(cluster.getSolrClient());
CollectionAdminRequest.reloadCollection(testCollectionName).process(cluster.getSolrClient());
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index bfd55b3..dbe3b12 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.cloud.StoppableIndexingThread;
import org.apache.solr.common.ParWork;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit;
@Slow
@LuceneTestCase.Nightly
+@Ignore // tmp using too large for test ram
public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -56,7 +58,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
@Test
public void start() throws Exception {
- int collectionCnt = 5;
+ int collectionCnt = 80;
List<Future> futures = new ArrayList<>();
List<Future> indexFutures = new ArrayList<>();
for (int i = 0; i < collectionCnt; i ++) {
@@ -96,18 +98,22 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
cluster.waitForActiveCollection(collectionName, 4, 16);
}
-
+ List<JettySolrRunner> stoppedRunners = new ArrayList<>();
for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
log.info("Stopping {}", runner);
+ if (random().nextBoolean()) {
+ continue;
+ }
runner.stop();
+ stoppedRunners.add(runner);
}
- for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
+ for (JettySolrRunner runner : stoppedRunners) {
log.info("Starting {}", runner);
runner.start();
}
-
+ Thread.sleep(5000);
for (int r = 0; r < 2; r++) {
for (int i = 0; i < collectionCnt; i++) {
final String collectionName = "testCollection" + i;
diff --git a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
index d3201f0..954dd35 100644
--- a/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/core/CachingDirectoryFactoryTest.java
@@ -75,7 +75,7 @@ public class CachingDirectoryFactoryTest extends SolrTestCaseJ4 {
incRefThread.start();
}
- Thread.sleep(TEST_NIGHTLY ? 2000 : 50);
+ Thread.sleep(TEST_NIGHTLY ? LuceneTestCase.atLeast(500) : 50);
Thread closeThread = new Thread() {
public void run() {
diff --git a/solr/core/src/test/org/apache/solr/core/TestConfigSets.java b/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
index 70af6a0..1584883 100644
--- a/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
+++ b/solr/core/src/test/org/apache/solr/core/TestConfigSets.java
@@ -136,10 +136,10 @@ public class TestConfigSets extends SolrTestCaseJ4 {
// Now copy in a config with a /dump handler and reload
FileUtils.copyFile(SolrTestUtil.getFile("solr/collection1/conf/solrconfig-withgethandler.xml"), new File(new File(configSetsDir, "configset-2/conf"), "solrconfig.xml"));
container.reload("core1");
- core.close();
+
core = container.getCore("core1");
assertThat("A /dump handler should be defined in the reloaded configuration", core.getRequestHandler("/dump"), is(notNullValue()));
- core.close();
+
} finally {
container.shutdown();
}
diff --git a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
index f8f5284..400699f 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/SearchHandlerTest.java
@@ -196,6 +196,8 @@ public class SearchHandlerTest extends SolrTestCaseJ4
CollectionAdminRequest.createCollection(collectionName, configName, 2, 2).waitForFinalState(true)
.process(miniCluster.getSolrClient());
+ miniCluster.waitForActiveCollection(collectionName, 2, 4);
+
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(ShardParams.SHARDS_TOLERANT, "requireZkConnected");
QueryRequest req = new QueryRequest(params);
diff --git a/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java b/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
index ef17209..11a07f6 100644
--- a/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
+++ b/solr/core/src/test/org/apache/solr/rest/SolrRestletTestBase.java
@@ -69,6 +69,8 @@ abstract public class SolrRestletTestBase extends RestTestBase {
@After
public void tearDown() throws Exception {
+ if (jetty != null) jetty.stop();
+ jetty = null;
super.tearDown();
}
}
diff --git a/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java b/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
index 56dc635..31a235d 100644
--- a/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
+++ b/solr/core/src/test/org/apache/solr/rest/schema/TestUniqueKeyFieldResource.java
@@ -34,10 +34,11 @@ public class TestUniqueKeyFieldResource extends SolrRestletTestBase {
@After
public void tearDown() throws Exception {
- super.tearDown();
if (jetty != null) {
jetty.stop();
}
+ jetty = null;
+ super.tearDown();
}
@Test
diff --git a/solr/server/etc/jetty-http.xml b/solr/server/etc/jetty-http.xml
index a38208b..b24cd98 100644
--- a/solr/server/etc/jetty-http.xml
+++ b/solr/server/etc/jetty-http.xml
@@ -34,7 +34,7 @@
<Item>
<New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory">
<Arg name="config"><Ref refid="httpConfig" /></Arg>
- <Set name="maxConcurrentStreams">1024</Set>
+ <Set name="maxConcurrentStreams">4096</Set>
<Set name="inputBufferSize">8192</Set>
<Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
<Set name="rateControlFactory">
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index c12b4f6..7ac4417 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -54,7 +54,7 @@
<Item>
<New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
- <Set name="maxConcurrentStreams">1024</Set>
+ <Set name="maxConcurrentStreams">4096</Set>
<Set name="inputBufferSize">8192</Set>
<Set name="streamIdleTimeout"><Property name="solr.jetty.http.streamIdleTimeout" default="240000"/></Set>
<Set name="rateControlFactory">
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
index a4c8326..258d62a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
@@ -186,7 +186,12 @@ public class ShardTerms implements MapWriter {
*/
public ShardTerms setTermEqualsToLeader(String coreNodeName) {
long maxTerm = getMaxTerm();
- if (values.get(coreNodeName) == maxTerm) return null;
+ Long term = values.get(coreNodeName);
+ if (term == null) {
+ registerTerm(coreNodeName);
+ term = 0l;
+ }
+ if (term == maxTerm) return null;
Map<String, Long> newValues = new ConcurrentHashMap<>(values);
newValues.put(coreNodeName, maxTerm);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 9357419..69f090b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -102,7 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
private volatile String defaultCollection;
//no of times collection state to be reloaded if stale state error is received
- private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "3"));
+ private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "10"));
private Random rand = new Random();
private final boolean updatesToLeaders;
@@ -112,7 +112,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
private final ExecutorService threadPool;
private String idField = ID;
public static final String STATE_VERSION = "_stateVer_";
- private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);//3 seconds or 3 million nanos
+ private long retryExpiryTime = TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);//3 seconds or 3 million nanos
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -366,7 +366,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
catch (RuntimeException e) {
// not ready yet, then...
}
- TimeUnit.MILLISECONDS.sleep(250);
+ TimeUnit.MILLISECONDS.sleep(50);
}
throw new TimeoutException("Timed out waiting for cluster");
}
@@ -482,9 +482,9 @@ public abstract class BaseCloudSolrClient extends SolrClient {
DocCollection col = getDocCollection(collection, null);
- if (col == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collection);
- }
+// if (col == null) {
+// throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collection);
+// }
DocRouter router = col.getRouter();
@@ -838,7 +838,6 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
List<String> inputCollections =
collection == null ? Collections.emptyList() : StrUtils.splitSmart(collection, ",", true);
-
return requestWithRetryOnStaleState(request, 0, inputCollections);
}
@@ -878,18 +877,18 @@ public abstract class BaseCloudSolrClient extends SolrClient {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
}
int collVer = coll.getZNodeVersion();
- if (coll.getStateFormat()>1) {
- if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
- requestedCollections.add(coll);
- if (stateVerParamBuilder == null) {
- stateVerParamBuilder = new StringBuilder();
- } else {
- stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
- }
+ if (requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+ requestedCollections.add(coll);
- stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+ if (stateVerParamBuilder == null) {
+ stateVerParamBuilder = new StringBuilder();
+ } else {
+ stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
}
+ Map su = coll.getStateUpdates();
+ stateVerParamBuilder.append(coll.getName()).append(":").append(collVer).append(">").append(su == null ? 0 : su.hashCode());
+
}
if (stateVerParamBuilder != null) {
@@ -980,7 +979,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
boolean stateWasStale = false;
- if (retryCount < MAX_STALE_RETRIES &&
+ if (retryCount <= MAX_STALE_RETRIES &&
requestedCollections != null &&
!requestedCollections.isEmpty() &&
(SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
@@ -997,7 +996,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
// if we experienced a communication error, it's worth checking the state
// with ZK just to make sure the node we're trying to hit is still part of the collection
- if (retryCount < MAX_STALE_RETRIES &&
+ if (retryCount <= MAX_STALE_RETRIES &&
!stateWasStale &&
requestedCollections != null &&
!requestedCollections.isEmpty() &&
@@ -1020,7 +1019,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
// if the state was stale, then we retry the request once with new state pulled from Zk
if (stateWasStale) {
- log.warn("Re-trying request to collection(s) {} after stale state error from server. retryCound={}", inputCollections, retryCount);
+ log.warn("Re-trying request to collection(s) {} after stale state error from server.", inputCollections);
resp = requestWithRetryOnStaleState(request, retryCount+1, inputCollections);
} else {
if (exc instanceof SolrException || exc instanceof SolrServerException || exc instanceof IOException) {
@@ -1030,62 +1029,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
}
}
- if (resp != null && resp.get("exception") == null) {
- waitForClusterStateUpdates(request, resp);
- }
return resp;
}
- protected void waitForClusterStateUpdates(SolrRequest request, NamedList<Object> resp) {
-
- // TODO - better check that this is a collections api call
- if (request.getParams() == null || request.getParams().get(CoreAdminParams.ACTION) == null) {
- return;
- }
-
- String action = request.getParams().get(CoreAdminParams.ACTION);
-
- String collection = request.getParams().get("collection");
- if (collection == null) {
- collection = request.getParams().get("name");
- }
- if (collection != null) {
- Integer ver = (Integer) resp.get("csver");
- if (ver != null) {
- try {
- log.info("Wait for catch up to server state {}", ver);
- getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
- if (collectionState != null && collectionState.getZNodeVersion() >= ver) {
- return true;
- }
- return false;
- });
- } catch (TimeoutException | InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for collection version " + ver, e);
- }
- } else if (request.getParams().get(CoreAdminParams.ACTION).equals(CollectionParams.CollectionAction.DELETE.toString())) {
-// try {
-// getZkStateReader().waitForState(collection, 10, TimeUnit.SECONDS, (c) -> c == null);
-// } catch (InterruptedException e) {
-// ParWork.propagateInterrupt(e);
-// throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
-// } catch (TimeoutException e) {
-// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-// }
- }
-
- }
- }
-
protected NamedList<Object> sendRequest(SolrRequest request, List<String> inputCollections)
throws SolrServerException, IOException {
-
-// if (request.getParams().get(STATE_VERSION) == null) {
-// throw new IllegalStateException("State version cannot be null " + request.getParams());
-// }
-
connect();
boolean sendToLeaders = false;
@@ -1127,7 +1076,9 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
} else if (ADMIN_PATHS.contains(request.getPath())) {
- for (String liveNode : liveNodes) {
+ List<String> liveNodesList = new ArrayList<>(liveNodes);
+ Collections.shuffle(liveNodesList, rand);
+ for (String liveNode : liveNodesList) {
theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
getClusterStateProvider().getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
}
@@ -1143,35 +1094,67 @@ public abstract class BaseCloudSolrClient extends SolrClient {
// at every shard when getting leaders if we tweaked some things
// Retrieve slices from the cloud state and, for each collection specified, add it to the Map of slices.
- Map<String,Slice> slices = new HashMap<>();
- String shardKeys = reqParams.get(ShardParams._ROUTE_);
- for (String collectionName : collectionNames) {
- DocCollection col = getDocCollection(collectionName, null);
- if (col == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
- }
- List<Slice> routeSlices = new ArrayList<>(col.getRouter().getSearchSlices(shardKeys, reqParams , col));
- Collections.shuffle(routeSlices);
- ClientUtils.addSlices(slices, collectionName, routeSlices, true);
- }
-
- // Gather URLs, grouped by leader or replica
List<Replica> sortedReplicas = new ArrayList<>();
List<Replica> replicas = new ArrayList<>();
- for (Slice slice : slices.values()) {
- Replica leader = slice.getLeader();
- ArrayList<Replica> replicaList = new ArrayList<>(slice.getReplicas());
- Collections.shuffle(replicaList);
- for (Replica replica : replicaList) {
- String node = replica.getNodeName();
- if (!liveNodes.contains(node) // Must be a live node to continue
- || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
- continue;
- if (sendToLeaders && replica.equals(leader)) {
- sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
- } else {
- replicas.add(replica); // replicas here
+ for (int i = 0; i < 2; i++) {
+ Map<String,Slice> slices = new HashMap<>();
+ String shardKeys = reqParams.get(ShardParams._ROUTE_);
+ for (String collectionName : collectionNames) {
+ DocCollection col = getDocCollection(collectionName, null);
+ if (col == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
}
+ slices.putAll(col.getSlicesMap());
+ List<Slice> routeSlices = new ArrayList<>(col.getRouter().getSearchSlices(shardKeys, reqParams, col));
+ Collections.shuffle(routeSlices);
+ ClientUtils.addSlices(slices, collectionName, routeSlices, true);
+ }
+
+ // Gather URLs, grouped by leader or replica
+
+ for (Slice slice : slices.values()) {
+ Replica leader = slice.getLeader();
+ ArrayList<Replica> replicaList = new ArrayList<>(slice.getReplicas());
+ Collections.shuffle(replicaList);
+ for (Replica replica : replicaList) {
+ String node = replica.getNodeName();
+ if (!liveNodes.contains(node) // Must be a live node to continue
+ || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+ continue;
+ if (sendToLeaders && replica.equals(leader)) {
+ sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
+ } else {
+ replicas.add(replica); // replicas here
+ }
+ }
+ }
+
+ if (sortedReplicas.size() == 0) {
+ if (getClusterStateProvider() instanceof ZkClientClusterStateProvider) {
+ ZkClientClusterStateProvider provider = (ZkClientClusterStateProvider) getClusterStateProvider();
+ getClusterStateProvider().connect();
+ for (String collectionName : collectionNames) {
+ try {
+ provider.zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, (liveNodes1, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ List<Replica> reps = collectionState.getReplicas();
+ for (Replica rep : reps) {
+ if (liveNodes1.contains(rep.getNodeName()) && rep.getState() == Replica.State.ACTIVE) {
+ return true;
+ }
+ } return false;
+ });
+ } catch (InterruptedException e) {
+
+ } catch (TimeoutException e) {
+
+ }
+ }
+ }
+ } else {
+ break;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d4e7cdc..1020d22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -314,20 +314,23 @@ public class Http2SolrClient extends SolrClient {
if (log.isTraceEnabled()) log.trace("Closing {} closeClient={}", this.getClass().getSimpleName(), closeClient);
// assert closeTracker != null ? closeTracker.close() : true;
try {
- asyncTracker.close();
- } catch (Exception e) {
- log.error("Exception closing httpClient asyncTracker", e);
- }
- closed = true;
- if (closeClient) {
try {
- httpClient.stop();
+ asyncTracker.close();
} catch (Exception e) {
- log.error("Exception closing httpClient", e);
+ log.error("Exception closing httpClient asyncTracker", e);
+ }
+ closed = true;
+ if (closeClient) {
+ try {
+ httpClient.stop();
+ } catch (Exception e) {
+ log.error("Exception closing httpClient", e);
+ }
}
+ if (log.isTraceEnabled()) log.trace("Done closing {}", this.getClass().getSimpleName());
+ } finally {
+ assert ObjectReleaseTracker.release(this);
}
- if (log.isTraceEnabled()) log.trace("Done closing {}", this.getClass().getSimpleName());
- assert ObjectReleaseTracker.release(this);
}
public void waitForOutstandingRequests() {
@@ -1150,7 +1153,7 @@ public class Http2SolrClient extends SolrClient {
private SSLConfig sslConfig = defaultSSLConfig;
private Integer idleTimeout = DEFAULT_IDLE_TIME;
private Integer connectionTimeout;
- private Integer maxConnectionsPerHost = 32;
+ private Integer maxConnectionsPerHost = 64;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
protected Map<String,String> headers = new HashMap<>(12);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index b55ce1a..13ad78c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -63,11 +63,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final Integer numPullReplicas;
private final Integer maxShardsPerNode;
private final Boolean readOnly;
- private final Map stateUpdates;
+ private volatile Map stateUpdates;
private final Long id;
private AtomicInteger sliceAssignCnt = new AtomicInteger();
- private volatile boolean createdLazy;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, -1, null);
@@ -265,11 +264,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
@Override
public String toString() {
- return "DocCollection("+name+":" + ":v=" + znodeVersion + " u=" + stateUpdates + " l=" + createdLazy + ")=" + toJSONString(this);
- }
-
- public void setCreatedLazy() {
- this.createdLazy = true;
+ return "DocCollection("+name+":" + ":v=" + znodeVersion + " u=" + stateUpdates + ")=" + toJSONString(this);
}
@Override
@@ -451,6 +446,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return stateUpdates != null;
}
+ public void setStateUpdates(Map stateUpdates) {
+ this.stateUpdates = stateUpdates;
+ }
+
public void setSliceAssignCnt(int i) {
sliceAssignCnt.set(i);
}
@@ -466,4 +465,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public void setZnodeVersion(int version) {
this.znodeVersion = version;
}
+
+ public Map<String, Slice> getSlicesCopy() {
+ return new LinkedHashMap<>(slices);
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index ae9f960..3f58db4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -266,7 +266,6 @@ public class Replica extends ZkNodeProps {
// only to be used by ZkStateWriter currently
public void setState(State state) {
this.state = state;
- propMap.put(ZkStateReader.STATE_PROP, state.toString());
}
public boolean isActive(Set<String> liveNodes) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 2a22b9c..1aaa259 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -279,7 +279,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
private Replica findLeader() {
for (Replica replica : replicas.values()) {
String leaderStr = replica.getStr(LEADER);
- if (leaderStr != null && leaderStr.equals("true") && replica.getState() == Replica.State.ACTIVE) {
+ if (leaderStr != null && leaderStr.equals("true")) {
return replica;
}
}
@@ -347,7 +347,6 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
// only to be used by ZkStateWriter currently
public void setState(State state) {
this.state = state;
- propMap.put(ZkStateReader.STATE_PROP, state.toString());
}
// only to be used by ZkStateWriter currently
@@ -365,7 +364,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
@Override
public String toString() {
- return name + ':' + toJSONString(propMap);
+ return name + "[" + leader + "]" + ':' + toJSONString(propMap);
}
@Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 2a48875..454e30c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -490,6 +490,11 @@ public class SolrZkClient implements Closeable {
}
}
+ public void create(final String path, final byte data[], CreateMode createMode, AsyncCallback.Create2Callback cb) throws KeeperException, InterruptedException {
+ List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+ connManager.getKeeper().create(path, data, acls, createMode, cb, "create", -1);
+ }
+
public String create(final String path, final byte[] data, final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
return create(path, data, createMode, retryOnConnLoss, false);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 30de809..256859d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.CloudInspectUtil;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.Callable;
@@ -67,8 +68,11 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.CloseTracker;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.TimeOut;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.zookeeper.AddWatchMode;
@@ -197,6 +201,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ReentrantLock> collectionStateLocks = new ConcurrentHashMap<>(32, 0.75f, 16);
+
/**
* Watchers of Collection properties
*/
@@ -260,7 +266,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public boolean canBeRemoved() {
int refCount = coreRefCount.get();
int watcherCount = stateWatchers.size();
- log.trace("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
+ log.debug("{} watcher can be removed coreRefCount={}, stateWatchers={}", collection, refCount, watcherCount);
return refCount <= 0 && watcherCount <= 0;
}
@@ -382,27 +388,38 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
refreshCollectionList();
refreshLiveNodes();
+
// Need a copy so we don't delete from what we're iterating over.
watchedCollectionStates.forEach((name, coll) -> {
DocCollection newState = null;
+ ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+ collectionStateLock.lock();
try {
- newState = fetchCollectionState(name);
- } catch (Exception e) {
- log.error("problem fetching update collection state", e);
- return;
- }
+ try {
+ newState = fetchCollectionState(name);
+ } catch (Exception e) {
+ log.error("problem fetching update collection state", e);
+ return;
+ }
+
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+ try {
+ getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
+ } catch (Exception e) {
+ log.error("Error fetching state updates", e);
+ }
- if (updateWatchedCollection(name, newState, false)) {
- constructState(newState);
+ if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
+ constructState(newState, "forciblyRefreshAllClusterStateSlow");
+ }
+ } finally {
+ collectionStateLock.unlock();
}
});
- } catch (KeeperException e) {
+ } catch (Exception e) {
log.error("", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
@@ -418,11 +435,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
DocCollection newState = fetchCollectionState(name);
- if (newState == null) {
- return;
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
+ try {
+ getAndProcessStateUpdates(name, stateUpdatesPath, newState, true);
+ } catch (Exception e) {
+ log.error("Error fetching state updates", e);
}
- if (updateWatchedCollection(name, newState, false)) {
+ if (updateWatchedCollection(name, newState == null ? null : new ClusterState.CollectionRef(newState))) {
updatedCollections.add(newState);
}
@@ -442,30 +462,22 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
refreshLiveNodes();
}
- public Integer compareStateVersions(String coll, int version) {
+ public Integer compareStateVersions(String coll, int version, int updateHash) {
+ log.debug("compareStateVersions {} {} {}", coll, version, updateHash);
DocCollection collection = getCollectionOrNull(coll);
if (collection == null) return null;
- if (collection.getZNodeVersion() < version) {
+ if (collection.getZNodeVersion() != version || (collection.getZNodeVersion() == version && collection.hasStateUpdates() && updateHash != collection.getStateUpdates().hashCode())) {
if (log.isDebugEnabled()) {
log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
}
DocCollection nu = getCollectionLive(coll);
+ log.debug("got collection {} {} {}", nu);
if (nu == null) return -3;
- if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
- if (updateWatchedCollection(coll, nu, false)) {
- constructState(nu);
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
- try {
- nu = getAndProcessStateUpdates(coll, stateUpdatesPath, true, nu, null);
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
- collection = nu;
- }
+
+ constructState(nu, "compareStateVersions");
}
- if (collection.getZNodeVersion() == version) {
+ if (collection.getZNodeVersion() == version && (!collection.hasStateUpdates() || updateHash == collection.getStateUpdates().hashCode())) {
return null;
}
@@ -545,14 +557,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
// on reconnect of SolrZkClient force refresh and re-add watches.
loadClusterProperties();
- IOUtils.closeQuietly(liveNodesWatcher);
-
this.liveNodesWatcher = new LiveNodeWatcher();
this.liveNodesWatcher.createWatch();
this.liveNodesWatcher.refresh();
- IOUtils.closeQuietly(collectionsChildWatcher);
this.collectionsChildWatcher = new CollectionsChildWatcher();
this.collectionsChildWatcher.createWatch();
@@ -622,10 +631,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
});
}
- private void constructState(DocCollection collection) {
- constructState(collection, "general");
- }
-
/**
* Construct the total state view from all sources.
*
@@ -634,27 +639,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
private void constructState(DocCollection collection, String caller) {
- if (log.isDebugEnabled()) log.trace("construct new cluster state on structure change {} {}", caller, collection);
-
-
+ log.trace("construct new cluster state on structure change {} {}", caller, collection);
log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]", collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(),
clusterState.keySet());
-//
-// watchedCollectionStates.forEach((s, slices) -> {
-// clusterState.putIfAbsent(s, new ClusterState.CollectionRef(slices));
-// lazyCollectionStates.remove(s);
-// });
-//
-// // Finally, add any lazy collections that aren't already accounted for.
-// lazyCollectionStates.forEach((s, lazyCollectionRef) -> {
-// clusterState.putIfAbsent(s, lazyCollectionRef);
-// });
- if (collection != null) {
- this.clusterState.put(collection.getName(), new ClusterState.CollectionRef(collection));
- }
-
notifyCloudCollectionsListeners(true);
if (collection != null) {
@@ -680,6 +669,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.debug("found collections {}", children);
// First, drop any children that disappeared.
this.lazyCollectionStates.keySet().retainAll(children);
+ log.debug("lazyCollectionState retained collections {}", children);
for (String coll : children) {
// We will create an eager collection for any interesting collections, so don't add to lazy.
if (!collectionWatches.containsKey(coll)) {
@@ -690,6 +680,8 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
LazyCollectionRef old = lazyCollectionStates.putIfAbsent(coll, docRef);
if (old == null) {
clusterState.put(coll, docRef);
+ ReentrantLock collectionStateLock = new ReentrantLock(true);
+ ReentrantLock oldLock = collectionStateLocks.putIfAbsent(coll, collectionStateLock);
log.debug("Created lazy collection {} interesting [{}] watched [{}] lazy [{}] total [{}]", coll, collectionWatches.keySet().size(),
watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.size());
@@ -698,30 +690,30 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
-// List<String> finalChildren = children;
-// watchedCollectionStates.keySet().forEach(col -> {
-// if (!finalChildren.contains(col)) {
-// log.debug("remove watched collection state due to live node {}", col);
-// watchedCollectionStates.remove(col);
-// CollectionStateWatcher sw = stateWatchersMap.remove(col);
-// if (sw != null) sw.removeWatch();
-// IOUtils.closeQuietly(sw);
-// if (collectionRemoved != null) {
-// collectionRemoved.removed(col);
-// }
-// if (sw != null) {
-// ReentrantLock lock = sw.collectionStateLock;
-// if (lock != null) {
-// lock.lock();
-// try {
-// clusterState.getCollectionStates().remove(col);
-// } finally {
-// lock.unlock();
-// }
-// }
-// }
-// }
-// });
+ List<String> finalChildren = children;
+ watchedCollectionStates.keySet().forEach(col -> {
+ if (!finalChildren.contains(col)) {
+ log.debug("remove watched collection state due to removed collections node {}", col);
+ watchedCollectionStates.remove(col);
+ CollectionStateWatcher sw = stateWatchersMap.remove(col);
+ if (sw != null) sw.removeWatch();
+ IOUtils.closeQuietly(sw);
+ if (collectionRemoved != null) {
+ collectionRemoved.removed(col);
+ }
+ if (sw != null) {
+ ReentrantLock collectionStateLock = collectionStateLocks.get(col);
+ if (collectionStateLock != null) {
+ collectionStateLock.lock();
+ try {
+ clusterState.remove(col);
+ } finally {
+ collectionStateLock.unlock();
+ }
+ }
+ }
+ }
+ });
}
@@ -760,14 +752,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
boolean fire = true;
newCollections = getCurrentCollections();
- oldCollections = lastFetchedCollectionSet.getAndSet(newCollections);
- if (!newCollections.equals(oldCollections) || notifyIfSame) {
- fire = true;
- }
+// oldCollections = lastFetchedCollectionSet.getAndSet(newCollections);
+// if (!newCollections.equals(oldCollections) || notifyIfSame) {
+// fire = true;
+// }
log.trace("Should fire listeners? {} listeners={}", fire, cloudCollectionsListeners.size());
if (fire) {
- cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(oldCollections, newCollections));
+ cloudCollectionsListeners.forEach(new CloudCollectionsListenerConsumer(newCollections, newCollections));
}
}
@@ -810,7 +802,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
DocCollection cdc = getCollectionLive(collName);
if (cdc != null) {
- cdc.setCreatedLazy();
lastUpdateTime = System.nanoTime();
cachedDocCollection = cdc;
return cdc;
@@ -960,15 +951,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public Replica getLeader(String collection, String shard) {
- return getLeader(getCollectionOrNull(collection), shard);
- }
-
- private Replica getLeader(DocCollection docCollection, String shard) {
- Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
- if (replica != null && replica.getState() == Replica.State.ACTIVE) {
- return replica;
+ try {
+ return getLeaderRetry(collection, shard, 5000);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (TimeoutException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- return null;
}
// public Replica getLeader(String collection, String shard) {
@@ -994,75 +983,79 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard) throws InterruptedException, TimeoutException {
- return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
+ return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT, false);
}
- /**
- * Get shard leader properties, with retry if none exist.
- */
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException, TimeoutException {
- return getLeaderRetry(collection, shard, timeout, true);
+ return getLeaderRetry(collection, shard, timeout, false);
}
/**
* Get shard leader properties, with retry if none exist.
*/
- public Replica getLeaderRetry(String collection, String shard, int timeout, boolean mustBeLive) throws InterruptedException, TimeoutException {
- DocCollection coll = getCollectionOrNull(collection);
- if (coll != null) {
- Slice slice = coll.getSlice(shard);
- if (slice != null) {
- Replica leader = slice.getLeader();
- if (leader != null && isNodeLive(leader.getNodeName())) {
- return leader;
- }
- }
- }
+ public Replica getLeaderRetry(String collection, String shard, int timeout, boolean checkValidLeader) throws InterruptedException, TimeoutException {
AtomicReference<Replica> returnLeader = new AtomicReference<>();
- try {
- waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
- if (c == null)
- return false;
- Slice slice = c.getSlice(shard);
- if (slice == null) return false;
- Replica zkLeader = null;
- Replica leader = slice.getLeader();
- if (leader != null && leader.getState() == Replica.State.ACTIVE) {
- if (isNodeLive(leader.getNodeName())) {
+ DocCollection coll;
+ int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+ TimeOut leaderVerifyTimeout = new TimeOut(timeout, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (true) {
+
+ try {
+ waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
+ if (c == null) return false;
+ Slice slice = c.getSlice(shard);
+ if (slice == null) return false;
+ Replica leader = slice.getLeader();
+ if (leader != null) {
+ if (leader.getState() != Replica.State.ACTIVE) {
+ return false;
+ }
+
returnLeader.set(leader);
return true;
}
- }
- // MRM TODO: remove
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
- if (isNodeLive(replica.getNodeName())) {
- returnLeader.set(replica);
- return true;
- }
+
+ return false;
+ });
+ } catch (TimeoutException e) {
+ coll = getCollectionOrNull(collection);
+ throw new TimeoutException(
+ "No registered leader was found after waiting for " + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
+ + " with live_nodes=" + liveNodes);
+ }
+
+ Replica leader = returnLeader.get();
+ if (checkValidLeader) {
+ try (Http2SolrClient client = new Http2SolrClient.Builder("").idleTimeout(readTimeout).markInternalRequest().build()) {
+ CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+ prepCmd.setCoreName(leader.getName());
+ prepCmd.setLeaderName(leader.getName());
+ prepCmd.setCollection(leader.getCollection());
+ prepCmd.setShardId(leader.getSlice());
+
+ prepCmd.setBasePath(leader.getBaseUrl());
+
+ try {
+ NamedList<Object> result = client.request(prepCmd);
+ break;
+ } catch (Exception e) {
+ log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
}
}
+ if (leaderVerifyTimeout.hasTimedOut()) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
+ }
- return false;
- });
- } catch (TimeoutException e) {
- coll = getCollectionOrNull(collection);
- throw new TimeoutException("No registered leader was found after waiting for "
- + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
- + " with live_nodes=" + liveNodes + " zkLeaderNode=" + (coll == null ? "null collection" : getLeaderProps(collection, coll.getId(), shard)));
+ } else {
+ break;
+ }
}
-
- Replica leader = returnLeader.get();
-
- if (leader == null) {
- coll = getCollectionOrNull(collection);
- throw new SolrException(ErrorCode.SERVER_ERROR, "No registered leader was found "
- + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection)
- + " with live_nodes=" + liveNodes + " zkLeaderNode=" + getLeaderProps(collection, coll.getId(), shard));
+ if (returnLeader.get() == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "No registered leader was found " + "collection: " + collection + " slice: " + shard + " saw state=" + clusterState.get(collection) + " with live_nodes=" + liveNodes);
}
-
- return leader;
+ return returnLeader.get();
}
private Replica getLeaderProps(final String collection, long collId, final String slice) {
@@ -1384,8 +1377,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
class CollectionStateWatcher implements Watcher, Closeable {
private final String coll;
private volatile StateUpdateWatcher stateUpdateWatcher;
-
- private final ReentrantLock collectionStateLock = new ReentrantLock();
+
private volatile boolean closed;
CollectionStateWatcher(String coll) {
@@ -1422,13 +1414,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* with the results of the refresh.
*/
public void refresh() {
+ ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
collectionStateLock.lock();
try {
DocCollection newState = fetchCollectionState(coll);
-
- if (!updateWatchedCollection(coll, newState, false)) {
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+ newState = getAndProcessStateUpdates(coll, stateUpdatesPath, newState, true);
+ if (!updateWatchedCollection(coll, newState == null ? null : new ClusterState.CollectionRef(newState))) {
return;
}
+
constructState(newState, "state.json watcher");
} catch (Exception e) {
log.error("A ZK error has occurred refreshing CollectionStateWatcher for collection={}", coll, e);
@@ -1458,7 +1453,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
String collectionCSNPath = getCollectionSCNPath(coll);
try {
zkClient.removeWatches(collectionCSNPath, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -1466,7 +1461,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
zkClient.removeWatches(stateUpdateWatcher.stateUpdatesPath, stateUpdateWatcher, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -1475,12 +1470,14 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void refreshStateUpdates() {
if (log.isDebugEnabled()) log.debug("fetch additional state updates {}", coll);
-
+ ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
try {
collectionStateLock.lock();
- getAndProcessStateUpdates(coll, stateUpdateWatcher.stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
+ getAndProcessStateUpdates(coll, stateUpdateWatcher.stateUpdatesPath, getCollectionOrNull(coll), false);
} catch (Exception e) {
log.error("Unwatched collection: [{}]", coll, e);
+ } finally {
+ collectionStateLock.unlock();
}
}
@@ -1506,14 +1503,16 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void process(WatchedEvent event) {
if (zkClient.isClosed() || closed) return;
log.trace("_statupdates event {}", event);
-
+ ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+ collectionStateLock.lock();
try {
- collectionStateLock.lock();
- getAndProcessStateUpdates(coll, stateUpdatesPath, false, getCollectionOrNull(coll), collectionStateLock);
+ getAndProcessStateUpdates(coll, stateUpdatesPath, getCollectionOrNull(coll), false);
} catch (AlreadyClosedException e) {
} catch (Exception e) {
log.error("Unwatched collection: [{}]", coll, e);
+ } finally {
+ collectionStateLock.unlock();
}
}
}
@@ -1570,7 +1569,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
zkClient.removeWatches(znodePath, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
if (log.isDebugEnabled()) log.debug("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
@@ -1627,7 +1626,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
// MRM TODO: persistent watch
class CollectionsChildWatcher implements Watcher, Closeable {
- volatile boolean watchRemoved = true;
+
@Override
public void process(WatchedEvent event) {
if (ZkStateReader.this.closed) {
@@ -1667,7 +1666,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void createWatch() {
- watchRemoved = false;
try {
zkClient.addWatch(COLLECTIONS_ZKNODE, this, AddWatchMode.PERSISTENT);
} catch (Exception e) {
@@ -1676,11 +1674,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void removeWatch() {
- if (watchRemoved) return;
- watchRemoved = true;
try {
zkClient.removeWatches(COLLECTIONS_ZKNODE, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.warn("Exception removing watch", e);
@@ -1697,7 +1693,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* Watches the live_nodes and syncs changes.
*/
class LiveNodeWatcher implements Watcher, Closeable {
- volatile boolean watchRemoved = true;
+
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
@@ -1725,7 +1721,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void createWatch() {
- watchRemoved = false;
try {
zkClient.addWatch(LIVE_NODES_ZKNODE, this, AddWatchMode.PERSISTENT);
} catch (Exception e) {
@@ -1735,13 +1730,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void removeWatch() {
- if (watchRemoved) {
- return;
- }
- watchRemoved = true;
try {
zkClient.removeWatches(LIVE_NODES_ZKNODE, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.warn("Exception removing watch", e);
@@ -1757,37 +1748,34 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public DocCollection getCollectionLive(String coll) {
log.debug("getCollectionLive {}", coll);
DocCollection newState;
+
+ ReentrantLock collectionStateLock = collectionStateLocks.get(coll);
+ collectionStateLock.lock();
try {
newState = fetchCollectionState(coll);
- if (newState != null) {
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
- try {
- newState = getAndProcessStateUpdates(coll, stateUpdatesPath, true, newState, null);
- } catch (Exception e) {
- log.error("", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+ newState = getAndProcessStateUpdates(coll, stateUpdatesPath, newState, true);
+ // constructState(newState, "getCollectionLive");
} catch (KeeperException e) {
log.warn("Zookeeper error getting latest collection state for collection={}", coll, e);
return null;
- } catch (InterruptedException e) {
- log.error("Unwatched collection: [{}]", coll, e);
- throw new SolrException(ErrorCode.SERVER_ERROR, "interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception getting fetching collection state: [{}]", coll, e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception getting fetching collection state: " + coll, e);
+ } finally {
+ collectionStateLock.unlock();
}
return newState;
-
}
- private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, boolean live, DocCollection docCollection, ReentrantLock collectionStateLock) throws KeeperException, InterruptedException {
- DocCollection result = null;
+ private DocCollection getAndProcessStateUpdates(String coll, String stateUpdatesPath, DocCollection docCollection, boolean live) throws KeeperException, InterruptedException {
try {
log.trace("get and process state updates for {}", coll);
Stat stat;
try {
- stat = getZkClient().exists(stateUpdatesPath, null,true);
+ stat = getZkClient().exists(stateUpdatesPath, null, true, true);
if (stat == null) {
return docCollection;
}
@@ -1798,7 +1786,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (docCollection != null && docCollection.hasStateUpdates()) {
int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
- if (stat.getVersion() <= oldVersion) {
+ if (stat.getVersion() < oldVersion) {
if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
return docCollection;
}
@@ -1815,132 +1803,124 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (data == null) {
log.info("No data found for {}", stateUpdatesPath);
+ // docCollection.getStateUpdates().clear();
return docCollection;
}
Map<String,Object> m = (Map) fromJSON(data);
- if (log.isDebugEnabled()) log.debug("Got additional state updates {}", m);
+ log.trace("Got additional state updates {}", m);
if (m.size() == 0) {
return docCollection;
}
Integer version = Integer.parseInt((String) m.get("_cs_ver_"));
- if (log.isDebugEnabled()) log.debug("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
+ log.trace("Got additional state updates with znode version {} for cs version {} updates={}", stat.getVersion(), version, m);
- m.remove("_cs_ver_");
+ //m.remove("_cs_ver_");
m.put("_ver_", stat.getVersion());
- try {
- Set<Entry<String,Object>> entrySet = m.entrySet();
- if (docCollection != null) {
- if (version < docCollection.getZNodeVersion()) {
- if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+ Set<Entry<String,Object>> entrySet = m.entrySet();
+
+ if (docCollection != null) {
+ if (version < docCollection.getZNodeVersion()) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older state.json {}, ours is now {}", version, docCollection.getZNodeVersion());
+ return docCollection;
+ }
+
+ if (docCollection.hasStateUpdates()) {
+ int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
+ if (stat.getVersion() < oldVersion) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
return docCollection;
}
+ }
- if (docCollection.hasStateUpdates()) {
- int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
- if (stat.getVersion() < oldVersion) {
- if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
- return docCollection;
- }
+
+ for (Entry<String,Object> entry : entrySet) {
+ String id = entry.getKey();
+ if (id.equals("_ver_") || id.equals("_cs_ver_")) continue;
+ Replica.State state = null;
+ if (!entry.getValue().equals("l")) {
+ state = Replica.State.shortStateToState((String) entry.getValue());
}
- for (Entry<String,Object> entry : entrySet) {
- String id = entry.getKey();
- if (id.equals("_ver_")) continue;
- Replica.State state = null;
- if (!entry.getValue().equals("l")) {
- state = Replica.State.shortStateToState((String) entry.getValue());
- }
+ Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
+ log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
+
+ if (replica != null) {
+
+ // if (replica.getState() != state || entry.getValue().equals("l")) {
+ Slice slice = docCollection.getSlice(replica.getSlice());
+ Map<String,Replica> replicasMap = slice.getReplicasCopy();
+ Map properties = new HashMap(replica.getProperties());
- Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
- if (log.isTraceEnabled()) log.trace("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica.getName(), id, docCollection.getReplicaByIds());
-
- if (replica != null) {
-
- // if (replica.getState() != state || entry.getValue().equals("l")) {
- Slice slice = docCollection.getSlice(replica.getSlice());
- Map<String,Replica> replicasMap = new HashMap(slice.getReplicasMap());
- Map properties = new HashMap(replica.getProperties());
- if (entry.getValue().equals("l")) {
- if (log.isDebugEnabled()) log.debug("state is leader, set to active and leader prop");
- properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
- properties.put("leader", "true");
-
- for (Replica r : replicasMap.values()) {
- if (replica.getName().equals(r.getName())) {
- continue;
- }
- log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
- if ("true".equals(r.getProperties().get(LEADER_PROP))) {
- log.debug("remove leader prop {}", r);
- Map<String,Object> props = new HashMap<>(r.getProperties());
- props.remove(LEADER_PROP);
- Replica newReplica = new Replica(r.getName(), props, coll, docCollection.getId(), r.getSlice(), ZkStateReader.this);
- replicasMap.put(r.getName(), newReplica);
- }
+ if (entry.getValue().equals("l")) {
+ log.trace("state is leader, set to active and leader prop id={}", replica.getId());
+ properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE);
+ properties.put(LEADER_PROP, "true");
+
+ for (Replica r : replicasMap.values()) {
+ if (replica.getName().equals(r.getName())) {
+ continue;
}
- } else if (state != null && !properties.get(ZkStateReader.STATE_PROP).equals(state.toString())) {
- log.trace("std state, set to {}", state);
- properties.put(ZkStateReader.STATE_PROP, state.toString());
- if ("true".equals(properties.get(LEADER_PROP))) {
- properties.remove(LEADER_PROP);
+ log.trace("process non leader {} {}", r, r.getProperty(LEADER_PROP));
+ if ("true".equals(r.getProperties().get(LEADER_PROP))) {
+ log.debug("remove leader prop {}", r);
+ Map<String,Object> props = new HashMap<>(r.getProperties());
+ props.remove(LEADER_PROP);
+ Replica newReplica = new Replica(r.getName(), props, coll, docCollection.getId(), r.getSlice(), ZkStateReader.this);
+ replicasMap.put(r.getName(), newReplica);
}
}
+ } else if (state != null) {
+ log.trace("std state, set to {}", state);
+ properties.put(ZkStateReader.STATE_PROP, state.toString());
+ if ("true".equals(properties.get(LEADER_PROP))) {
+ properties.remove(LEADER_PROP);
+ }
+ }
- Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
-
- log.trace("add new replica {}", newReplica);
+ Replica newReplica = new Replica(replica.getName(), properties, coll, docCollection.getId(), replica.getSlice(), ZkStateReader.this);
- replicasMap.put(replica.getName(), newReplica);
+ log.trace("add new replica {}", newReplica);
- Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), coll, replica.id, ZkStateReader.this);
- // if (newReplica.getProperty("leader") != null) {
- // newSlice.setLeader(newReplica);
- // }
+ replicasMap.put(replica.getName(), newReplica);
- Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
- newSlices.put(slice.getName(), newSlice);
+ Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), coll, docCollection.getId(), ZkStateReader.this);
- log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
+ Map<String,Slice> newSlices = docCollection.getSlicesCopy();
+ newSlices.put(slice.getName(), newSlice);
- DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), docCollection.getZNodeVersion(), m);
- docCollection = newDocCollection;
+ log.trace("add new slice leader={} {}", newSlice.getLeader(), newSlice);
- result = newDocCollection;
+ DocCollection newDocCollection = new DocCollection(coll, newSlices, docCollection.getProperties(), docCollection.getRouter(), version, m);
+ docCollection = newDocCollection;
- // }
- } else {
- if (log.isDebugEnabled()) log.debug("Could not find core to update local state {} {}", id, state);
- }
- }
- if (result == null) return docCollection;
- if (!live) {
- watchedCollectionStates.put(result.getName(), result);
-
- // Finally, add any lazy collections that aren't already accounted for.
- // lazyCollectionStates.forEach((s, lazyCollectionRef) -> {
- // if (!s.equals(coll)) {
- // result.putIfAbsent(s, lazyCollectionRef);
- // }
- //
- // });
-
- log.trace("Set a new clusterstate based on update diff {}", result);
-
- updateWatchedCollection(coll, result, false);
- constructState(result);
+ } else {
+ if (log.isDebugEnabled()) log.debug("Could not find core to update local state {} {}", id, state);
}
}
- } catch (Exception e) {
- log.error("exeption trying to process additional updates", e);
+
+ log.trace("Set a new clusterstate based on update diff {} live={}", docCollection, live);
+
+ if (live) {
+ return docCollection;
+ }
+
+ if (!updateWatchedCollection(coll, docCollection == null ? null : new ClusterState.CollectionRef(docCollection))) {
+ return docCollection;
+ }
+
+ constructState(docCollection, "state.json state updates watcher");
+
}
- return result == null ? docCollection : result;
- } finally {
- if (collectionStateLock != null) collectionStateLock.unlock();
+
+ } catch (Exception e) {
+ log.error("exeption trying to process additional updates", e);
}
+ return docCollection == null ? docCollection : docCollection;
+
}
private DocCollection fetchCollectionState(String coll) throws KeeperException, InterruptedException {
@@ -1950,46 +1930,51 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
int version = 0;
- Stat stateStat = zkClient.exists(collectionPath, null, true, false);
- if (stateStat != null) {
- version = stateStat.getVersion();
- if (log.isDebugEnabled()) log.debug("version for cs is {}", version);
- // version we would get
- DocCollection docCollection = watchedCollectionStates.get(coll);
- if (docCollection != null) {
- int localVersion = docCollection.getZNodeVersion();
- if (log.isDebugEnabled())
- log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
- if (docCollection.hasStateUpdates()) {
- if (localVersion == version) {
- return docCollection;
- }
- } else {
- if (localVersion == version) {
- return docCollection;
+
+ // version we would get
+ DocCollection docCollection = null;
+
+ ClusterState.CollectionRef collRef = clusterState.get(coll);
+ if (collRef != null && !collRef.isLazilyLoaded()) {
+ docCollection = collRef.get();
+ }
+
+ if (collRef != null) {
+ Stat stateStat = zkClient.exists(collectionPath, null, true, false);
+ if (stateStat != null) {
+ version = stateStat.getVersion();
+
+ if (log.isDebugEnabled()) log.debug("version for cs is {}, local version is {}", version, docCollection == null ? null : docCollection.getZNodeVersion());
+ if (docCollection != null) {
+ int localVersion = docCollection.getZNodeVersion();
+ if (log.isDebugEnabled()) log.debug("found version {}, our local version is {}, has updates {}", version, localVersion, docCollection.hasStateUpdates());
+ if (docCollection.hasStateUpdates()) {
+ if (localVersion > version) {
+ return docCollection;
+ }
}
}
- }
-// if (lazyCollectionStates.containsKey(coll)) {
-// LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
-// DocCollection cachedCollection = lazyColl.getCachedDocCollection();
-// if (cachedCollection != null) {
-// int localVersion = cachedCollection.getZNodeVersion();
-// if (cachedCollection.hasStateUpdates()) {
-// if (localVersion == version) {
-// return cachedCollection;
-// }
-// } else {
-// if (localVersion == version) {
-// return cachedCollection;
-// }
-// }
-// }
-// }
+ // if (lazyCollectionStates.containsKey(coll)) {
+ // LazyCollectionRef lazyColl = lazyCollectionStates.get(coll);
+ // DocCollection cachedCollection = lazyColl.getCachedDocCollection();
+ // if (cachedCollection != null) {
+ // int localVersion = cachedCollection.getZNodeVersion();
+ // if (cachedCollection.hasStateUpdates()) {
+ // if (localVersion == version) {
+ // return cachedCollection;
+ // }
+ // } else {
+ // if (localVersion == version) {
+ // return cachedCollection;
+ // }
+ // }
+ // }
+ // }
- } else {
- return null;
+ } else {
+ return null;
+ }
}
log.debug("getting latest state.json");
Stat stat = new Stat();
@@ -2005,7 +1990,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.debug("no data found at state.json node");
return null;
}
- DocCollection docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
+ docCollection = ClusterState.createDocCollectionFromJson(this, stat.getVersion(), data);
return docCollection;
}
@@ -2039,7 +2024,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
*/
public void registerCore(String collection, String coreName) {
- if (log.isDebugEnabled()) log.debug("register core for collection {}", collection);
+ if (log.isDebugEnabled()) log.debug("register core for collection {} {}", collection, coreName);
if (collection == null) {
throw new IllegalArgumentException("Collection cannot be null");
}
@@ -2048,21 +2033,23 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return;
}
+ AtomicReference<CollectionStateWatcher> createSw = new AtomicReference();
+
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
v = new CollectionWatch<>(collection);
CollectionStateWatcher sw = new CollectionStateWatcher(collection);
- stateWatchersMap.put(collection, sw);
sw.createWatch();
- sw.refresh();
- sw.refreshStateUpdates();
- v.coreRefCount.incrementAndGet();
- return v;
+ stateWatchersMap.put(collection, sw);
+ createSw.set(sw);
}
v.coreRefCount.incrementAndGet();
return v;
});
-
+ CollectionStateWatcher sw = createSw.get();
+ if (sw != null) {
+ sw.refresh();
+ }
}
public boolean watched(String collection) {
@@ -2085,29 +2072,35 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
throw new IllegalArgumentException("Collection cannot be null");
}
-// if (registeredCores.remove(coreName)) {
-// return;
-// }
+ if (registeredCores.remove(coreName)) {
+ return;
+ }
- AtomicBoolean reconstructState = new AtomicBoolean(false);
+ AtomicReference<CollectionStateWatcher> reconstructState = new AtomicReference();
collectionWatches.compute(collection, (k, v) -> {
if (v == null) return null;
if (v.coreRefCount.get() > 0)
v.coreRefCount.decrementAndGet();
if (v.canBeRemoved()) {
+ log.debug("no longer watch collection {}", collection);
watchedCollectionStates.remove(collection);
LazyCollectionRef docRef = new LazyCollectionRef(collection);
lazyCollectionStates.put(collection, docRef);
clusterState.put(collection, docRef);
-
- reconstructState.set(true);
+ CollectionStateWatcher stateWatcher = stateWatchersMap.remove(collection);
+ if (stateWatcher != null) {
+ reconstructState.set(stateWatcher);
+ }
return null;
}
return v;
});
- if (reconstructState.get()) {
- constructState(null);
+
+ CollectionStateWatcher sw = reconstructState.get();
+ if (sw != null) {
+ sw.removeWatch();
+ IOUtils.closeQuietly(sw);
}
}
@@ -2157,34 +2150,26 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (collection == null) {
throw new IllegalArgumentException("Collection cannot be null");
}
-
+ AtomicReference<CollectionStateWatcher> watchSet = new AtomicReference<>();
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
- log.debug("creating CollectionStateWatcher and refreshing for {}", collection);
+ log.debug("creating CollectionStateWatcher for {} and refreshing", collection);
v = new CollectionWatch<>(collection);
CollectionStateWatcher sw = new CollectionStateWatcher(collection);
- stateWatchersMap.put(collection, sw);
-
sw.createWatch();
- sw.refresh();
- sw.refreshStateUpdates();
+ stateWatchersMap.put(collection, sw);
+ watchSet.set(sw);
}
- log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
v.stateWatchers.add(docCollectionWatcher);
+ log.debug("Adding a DocCollectionWatcher for collection={} currentCount={}", collection, v.stateWatchers.size());
return v;
});
- DocCollection state = getCollectionOrNull(collection);
- boolean remove;
- try {
- remove = docCollectionWatcher.onStateChanged(state);
- } catch (Exception e) {
- log.error("Exception running DocCollectionWatcher collection={}");
- remove = true;
- }
- if (remove) {
- removeDocCollectionWatcher(collection, docCollectionWatcher);
- }
+ CollectionStateWatcher sw = watchSet.get();
+ if (sw != null) {
+ sw.refresh();
+ constructState(null, "registerDocCollectionWatcher");
+ }
}
@@ -2225,23 +2210,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
-// DocCollection coll = getCollectionOrNull(collection);
-// if (predicate.matches(liveNodes, coll)) {
-// return;
-// }
+ DocCollection coll = getCollectionOrNull(collection);
+ if (predicate.matches(getLiveNodes(), coll)) {
+ return;
+ }
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
registerCollectionStateWatcher(collection, watcher);
try {
-
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit)) {
- DocCollection coll = getCollectionOrNull(collection);
- if (predicate.matches(getLiveNodes(), coll)) {
- return;
- }
-
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + "live=" + liveNodes
+ docCollection.get());
}
@@ -2255,11 +2234,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
+ waitForActiveCollection(collection, wait, unit, false, shards, totalReplicas, true);
+ }
+
+ public void waitForActiveCollection(String collection, long wait, TimeUnit unit, boolean justLeaders, int shards, int totalReplicas, boolean exact) {
log.debug("waitForActiveCollection: {} interesting [{}] watched [{}] lazy [{}] total [{}]", collection, collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(),
clusterState.size());
assert collection != null;
- CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
+ CollectionStatePredicate predicate = expectedShardsAndActiveReplicas(justLeaders, shards, totalReplicas, exact);
AtomicReference<DocCollection> state = new AtomicReference<>();
AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
@@ -2314,9 +2297,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
- if (predicate.matches(liveNodes)) {
- return;
- }
throw new TimeoutException("Timeout waiting for live nodes, currently they are: " + liveNodes);
} finally {
@@ -2356,7 +2336,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
* @see #registerDocCollectionWatcher
*/
public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
- log.debug("remove watcher for collection {}", collection);
if (collection == null) {
throw new IllegalArgumentException("Collection cannot be null");
@@ -2367,8 +2346,9 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
collectionWatches.compute(collection, (k, v) -> {
if (v == null) return null;
v.stateWatchers.remove(watcher);
+ log.debug("remove watcher for collection {} currentCount={}", collection, v.stateWatchers.size());
if (v.canBeRemoved()) {
- log.trace("no longer watch collection {}", collection);
+ log.debug("no longer watch collection {}", collection);
watchedCollectionStates.remove(collection);
LazyCollectionRef docRef = new LazyCollectionRef(collection);
lazyCollectionStates.put(collection, docRef);
@@ -2384,7 +2364,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return v;
});
if (reconstructState.get()) {
- constructState(null);
+ // constructState(null, "removeDocCollectionWatcher");
}
}
@@ -2406,57 +2386,138 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
// returns true if the state has changed
- private boolean updateWatchedCollection(String coll, DocCollection newState, boolean live) {
+ private boolean updateWatchedCollection(String coll, ClusterState.CollectionRef newState) {
+ log.trace("updateWatchedCollection for [{}] [{}]", coll, newState);
try {
if (newState == null) {
if (log.isDebugEnabled()) log.debug("Removing cached collection state for [{}]", coll);
- watchedCollectionStates.remove(coll);
- return true;
+ DocCollection prev = watchedCollectionStates.remove(coll);
+ if (prev != null) {
+ CollectionStateWatcher sw = stateWatchersMap.remove(coll);
+ if (sw != null) {
+ IOUtils.closeQuietly(sw);
+ sw.removeWatch();
+ }
+ clusterState.remove(coll);
+ collectionStateLocks.remove(coll);
+// LazyCollectionRef lazyRef = new LazyCollectionRef(coll);
+// lazyCollectionStates.put(coll, lazyRef);
+// clusterState.put(coll, lazyRef);
+
+ if (collectionRemoved != null) {
+ collectionRemoved.removed(coll);
+ }
+ }
+ constructState(null, "updateWatchedCollection");
+ return false;
}
-// if (live) {
-// return true;
-// }
+ DocCollection newDocState = newState.get();
+ AtomicBoolean update = new AtomicBoolean();
+ clusterState.compute(coll, (k, v) -> {
- boolean updated = false;
- // CAS update loop
- while (true) {
- if (!collectionWatches.containsKey(coll)) {
- break;
+ if (v == null) {
+ log.debug("new state, update");
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
}
- DocCollection oldState = watchedCollectionStates.get(coll);
- if (oldState == null) {
- if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
- if (log.isDebugEnabled()) {
- log.debug("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion());
- }
- updated = true;
- break;
- }
- } else {
- if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
- // no change to state, but we might have been triggered by the addition of a
- // state watcher, so run notifications
- updated = true;
- break;
- }
- if (watchedCollectionStates.replace(coll, oldState, newState)) {
- if (log.isDebugEnabled()) {
- log.debug("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
- }
- updated = true;
- break;
+
+ ClusterState.CollectionRef docCollRef = clusterState.get(coll);
+ if (docCollRef == null) {
+ log.debug("no previous state found, update and notify");
+ update.set(true);
+ makeStateWatcher(coll, newDocState, null);
+ return newState;
+ }
+
+ if (docCollRef.isLazilyLoaded()) {
+ if (watchedCollectionStates.containsKey(coll)) {
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
}
}
- }
- return true;
+ if (docCollRef.isLazilyLoaded()) {
+ log.debug("state was lazy loaded");
+ update.set(false);
+ return newState;
+ }
+
+ DocCollection docColl = docCollRef.get();
+ if (docColl == null) {
+ log.debug("null lazy state found, update and notify");
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
+ }
+
+ if (newDocState.hasStateUpdates() && !docColl.hasStateUpdates() && newDocState.getZNodeVersion() >= docColl.getZNodeVersion()) {
+ log.debug("new state does have updates, replace >=");
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
+ }
+
+ if (newDocState.hasStateUpdates() && docColl.hasStateUpdates() && newDocState.getZNodeVersion() >= docColl.getZNodeVersion() && !newDocState.getStateUpdates().equals(docColl.getStateUpdates())) {
+ log.debug("new state has same version but different updates");
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
+ }
+
+ if (newDocState.getZNodeVersion() > docColl.getZNodeVersion()) {
+ log.debug("new state > old state, replace");
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+ return newState;
+ }
+
+
+ log.debug("replace state {} {}", docColl.getZNodeVersion(), newDocState.getZNodeVersion());
+ update.set(true);
+ LazyCollectionRef prev = lazyCollectionStates.remove(coll);
+ makeStateWatcher(coll, newDocState, prev);
+
+ return newState;
+ });
+
+ return update.get();
} catch (Exception e) {
log.error("Failing updating clusterstate", e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
+ private void makeStateWatcher(String coll, DocCollection newDocState, LazyCollectionRef prev) {
+ AtomicReference<CollectionStateWatcher> createSw = new AtomicReference();
+ if (prev != null) {
+ watchedCollectionStates.put(coll, newDocState);
+ collectionWatches.compute(newDocState.getName(), (k, v) -> {
+ if (v == null) {
+ v = new CollectionWatch<>(newDocState.getName());
+ CollectionStateWatcher sw = new CollectionStateWatcher(newDocState.getName());
+ sw.createWatch();
+ stateWatchersMap.put(newDocState.getName(), sw);
+ createSw.set(sw);
+ }
+ v.coreRefCount.incrementAndGet();
+ return v;
+ });
+ CollectionStateWatcher sw = createSw.get();
+ if (sw != null) {
+ sw.refresh();
+ }
+ }
+ }
+
public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
collectionPropsObservers.compute(collection, (k, v) -> {
@@ -2507,7 +2568,11 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
private void notifyStateWatchers(String collection, DocCollection collectionState) {
- log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState);
+ if (this.closed) {
+ log.warn("Already closed, won't notify");
+ return;
+ }
+ log.trace("Notify state watchers [{}] {}", collectionWatches.keySet(), collectionState.getName());
try {
notifications.submit(new Notification(collection, collectionState, collectionWatches));
@@ -2538,7 +2603,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
MDCLoggingContext.setNode(node);
}
- log.debug("notify on state change {}", collectionWatches.keySet());
+ log.trace("notify on state change {}", collectionWatches.keySet());
List<DocCollectionWatcher> watchers = new ArrayList<>();
collectionWatches.compute(collection, (k, v) -> {
@@ -2588,7 +2653,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
// called by createClusterStateWatchersAndUpdate()
private void refreshAliases(AliasesManager watcher) throws KeeperException, InterruptedException {
- constructState(null);
+ constructState(null, "refreshAliases");
zkClient.exists(ALIASES, watcher);
aliasesManager.update();
}
@@ -2848,6 +2913,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
public static CollectionStatePredicate expectedShardsAndActiveReplicas(int expectedShards, int expectedReplicas, boolean exact) {
+ return expectedShardsAndActiveReplicas(false, expectedShards, expectedReplicas, exact);
+ }
+
+ public static CollectionStatePredicate expectedShardsAndActiveReplicas(boolean justLeaders, int expectedShards, int expectedReplicas, boolean exact) {
return (liveNodes, collectionState) -> {
if (collectionState == null)
return false;
@@ -2877,14 +2946,46 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (leader == null) {
log.debug("slice={}", slice);
return false;
+ } else {
+ if (leader.getState() != Replica.State.ACTIVE) {
+ return false;
+ }
+// CoreAdminRequest.WaitForState prepCmd = new CoreAdminRequest.WaitForState();
+// prepCmd.setCoreName(leader.getName());
+// prepCmd.setLeaderName(leader.getName());
+// prepCmd.setCollection(collectionState.getName());
+// prepCmd.setShardId(slice.getName());
+//
+// int readTimeout = Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "7000"));
+//
+// try (Http2SolrClient client = new Http2SolrClient.Builder(leader.getBaseUrl()).idleTimeout(readTimeout).markInternalRequest().build()) {
+//
+// prepCmd.setBasePath(leader.getBaseUrl());
+//
+// try {
+// NamedList<Object> result = client.request(prepCmd);
+// } catch (SolrServerException | BaseHttpSolrClient.RemoteSolrException e) {
+// log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+// return false;
+// } catch (IOException e) {
+// log.info("failed checking for leader {} {}", leader.getName(), e.getMessage());
+// return false;
+// }
+// }
}
- for (Replica replica : slice) {
- if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
- activeReplicas++;
+ if (!justLeaders) {
+ for (Replica replica : slice) {
+ if (replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+ activeReplicas++;
+ }
}
}
log.trace("slice is {} and active replicas is {}, expected {} liveNodes={}", slice.getName(), activeReplicas, expectedReplicas, liveNodes);
}
+
+ if (justLeaders) {
+ return true;
+ }
if (!exact) {
if (activeReplicas >= expectedReplicas) {
return true;
@@ -3083,7 +3184,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
public void close() throws IOException {
try {
zkClient.removeWatches(path, this, WatcherType.Any, true);
- } catch (KeeperException.NoWatcherException e) {
+ } catch (KeeperException.NoWatcherException | AlreadyClosedException e) {
} catch (Exception e) {
log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
index d5c2f90..19ce16d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectReleaseTracker.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class ObjectReleaseTracker {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static Map<Object,ObjectTrackerException> OBJECTS = new ConcurrentHashMap<>(64, 0.75f, 1);
+ public static Map<Object,ObjectTrackerException> OBJECTS = new ConcurrentHashMap<>(128, 0.75f, 5);
protected final static ThreadLocal<StringBuilder> THREAD_LOCAL_SB = new ThreadLocal<>();
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index 95fd03e..7be6ad6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -64,7 +64,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
private final AtomicLong _lastShrink = new AtomicLong();
- private final Map<Runnable,Future> _threads = new ConcurrentHashMap<>(256);
+ private final Map<Runnable,Future> _threads = new ConcurrentHashMap<>(256, 0.75f, SysStats.PROC_COUNT);
private final Set<Future> _threadFutures = ConcurrentHashMap.newKeySet();
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index b66a644..eb66667 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -328,7 +328,7 @@ public class SolrTestCase extends Assert {
System.setProperty("useCompoundFile", "true");
System.setProperty("solr.tests.maxBufferedDocs", "1000");
- System.setProperty("solr.getleader.looptimeout", "500");
+ System.setProperty("solr.getleader.looptimeout", "1500");
System.setProperty("pkiHandlerPrivateKeyPath", SolrTestCaseJ4.class.getClassLoader().getResource("cryptokeys/priv_key512_pkcs8.pem").toExternalForm());
System.setProperty("pkiHandlerPublicKeyPath", SolrTestCaseJ4.class.getClassLoader().getResource("cryptokeys/pub_key512.der").toExternalForm());
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index d47ffc6..53b09bb 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -858,12 +858,12 @@ public class MiniSolrCloudCluster {
public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
if (collection == null) throw new IllegalArgumentException("null collection");
- waitForActiveCollection(collection, 120, TimeUnit.SECONDS, shards, totalReplicas);
+ waitForActiveCollection(collection, 60, TimeUnit.SECONDS, shards, totalReplicas);
}
public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
- waitForActiveCollection(collection, 120, TimeUnit.SECONDS, shards, totalReplicas, exact);
+ waitForActiveCollection(collection, 60, TimeUnit.SECONDS, shards, totalReplicas, exact);
}
public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {
@@ -881,7 +881,6 @@ public class MiniSolrCloudCluster {
log.info("waitForNode: {}", runner.getNodeName());
}
-
ZkStateReader reader = getSolrClient().getZkStateReader();
try {