You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 06:30:49 UTC
[lucene-solr] branch reference_impl_dev updated: @1159 More
efficient state publish from zkcontroller.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new c29a883 @1159 More efficient state publish from zkcontroller.
c29a883 is described below
commit c29a88394ec905c356b7f00e1584aa469bfe62e8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Nov 11 00:13:50 2020 -0600
@1159 More efficient state publish from zkcontroller.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 3 -
.../solr/cloud/ShardLeaderElectionContext.java | 6 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 114 +++++++++++++
.../java/org/apache/solr/cloud/ZkController.java | 18 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 4 -
.../apache/solr/cloud/overseer/ZkStateWriter.java | 181 ++++++++++++---------
.../apache/solr/handler/admin/PrepRecoveryOp.java | 78 +++------
.../client/solrj/request/CoreAdminRequest.java | 2 +-
8 files changed, 265 insertions(+), 141 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 1f88d0f..3890957 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -894,10 +894,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(coreName);
- prepCmd.setNodeName(zkController.getNodeName());
prepCmd.setState(Replica.State.RECOVERING);
- prepCmd.setCheckLive(true);
- prepCmd.setOnlyIfLeader(true);
prepCmd.setCollection(coreDescriptor.getCollectionName());
prepCmd.setShardId(coreDescriptor.getCloudDescriptor().getShardId());
final Slice.State state = slice.getState();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index f90c1495..b8b2ca5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -264,13 +264,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
ZkNodeProps zkNodes = ZkNodeProps
- .fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP), ZkStateReader.CORE_NAME_PROP,
- leaderProps.getName(), ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+ .fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.CORE_NAME_PROP,
+ leaderProps.getName(), ZkStateReader.STATE_PROP, "leader");
assert zkController != null;
assert zkController.getOverseer() != null;
log.info("Publish leader state");
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(zkNodes));
+ zkController.publish(zkNodes);
log.info("I am the new leader: " + leaderProps.getCoreUrl() + " " + shardId);
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
new file mode 100644
index 0000000..3cc1f93
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.ParWork;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class StatePublisher implements Closeable {
+ private static final Logger log = LoggerFactory
+ .getLogger(MethodHandles.lookup().lookupClass());
+ private final BlockingArrayQueue<ZkNodeProps> workQueue = new BlockingArrayQueue<>(30, 10);
+ private final ZkDistributedQueue overseerJobQueue;
+ private volatile Worker worker;
+ private volatile Future<?> workerFuture;
+
+ private volatile boolean terminated;
+ private class Worker implements Runnable {
+
+ Worker() {
+
+ }
+
+ @Override
+ public void run() {
+ while (!terminated) {
+ ZkNodeProps message = null;
+ ZkNodeProps bulkMessage = new ZkNodeProps();
+ bulkMessage.getProperties().put("operation", "state");
+ try {
+ message = workQueue.poll(5, TimeUnit.SECONDS);
+ if (message != null) {
+ log.info("Got state message " + message);
+
+ bulkMessage(message, bulkMessage);
+
+ while (message != null) {
+ message = workQueue.poll(0, TimeUnit.SECONDS);
+ log.info("Got state message " + message);
+ if (message != null) {
+ bulkMessage(message, bulkMessage);
+ }
+ }
+ processMessage(bulkMessage);
+ }
+
+ } catch (InterruptedException e) {
+ return;
+ } catch (Exception e) {
+ log.error("Exception in StatePublisher run loop", e);
+ return;
+ }
+ }
+ }
+
+ private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
+ String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+ String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+ String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
+
+ bulkMessage.getProperties().put(core, collection + "," + state);
+ }
+
+ private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
+ overseerJobQueue.offer(Utils.toJSON(message));
+ }
+ }
+
+ public StatePublisher(ZkDistributedQueue overseerJobQueue) {
+ this.overseerJobQueue = overseerJobQueue;
+ }
+
+ public void submitState(ZkNodeProps stateMessage) {
+ workQueue.offer(stateMessage);
+ }
+
+ public void start() {
+ this.worker = new Worker();
+ workerFuture = ParWork.getRootSharedExecutor().submit(this.worker);
+ }
+
+ public void close() {
+ this.terminated = true;
+ try {
+ workerFuture.cancel(true);
+ } catch (Exception e) {
+ log.error("Exception waiting for close", e);
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 3a8b589..749e815 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -140,6 +140,8 @@ public class ZkController implements Closeable, Runnable {
private CloseTracker closeTracker;
private boolean closeZkClient = false;
+ private volatile StatePublisher statePublisher;
+
private volatile ZkDistributedQueue overseerJobQueue;
private volatile OverseerTaskQueue overseerCollectionQueue;
private volatile OverseerTaskQueue overseerConfigSetQueue;
@@ -675,6 +677,7 @@ public class ZkController implements Closeable, Runnable {
this.shudownCalled = true;
this.isClosed = true;
+ IOUtils.closeQuietly(statePublisher);
try (ParWork closer = new ParWork(this, true, true)) {
closer.collect(overseer);
@@ -1195,6 +1198,9 @@ public class ZkController implements Closeable, Runnable {
// }
// });
}
+ statePublisher = new StatePublisher(overseerJobQueue);
+ statePublisher.start();
+
// Do this last to signal we're up.
createEphemeralLiveNode();
@@ -1728,10 +1734,10 @@ public class ZkController implements Closeable, Runnable {
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
- props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
+ // props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
props.put(CORE_NAME_PROP, cd.getName());
- props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
+ // props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ // props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
@@ -1774,12 +1780,16 @@ public class ZkController implements Closeable, Runnable {
if (updateLastState) {
cd.getCloudDescriptor().setLastPublished(state);
}
- overseerJobQueue.offer(Utils.toJSON(m));
+ statePublisher.submitState(m);
} finally {
MDCLoggingContext.clear();
}
}
+ public void publish(ZkNodeProps message) {
+ statePublisher.submitState(message);
+ }
+
public ZkShardTerms getShardTerms(String collection, String shardId) {
return getCollectionTerms(collection).getShard(shardId);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index c53b2f4..812226c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -347,10 +347,6 @@ public class ZkDistributedQueue implements DistributedQueue {
// TODO change to accept json
Map json = (Map) Utils.fromJSON(data);
- final String operation = (String) json.get(Overseer.QUEUE_OPERATION);
-// if (!OPERATIONS.contains(operation)) {
-// throw new IllegalArgumentException("unknown operation:" + operation + " contents:" + json);
-// }
Timer.Context time = stats.time(dir + "_offer");
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index da787b9..8eb5377 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -42,7 +42,6 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.util.BoundedTreeSet;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -70,7 +69,7 @@ public class ZkStateWriter {
private volatile ClusterState cs;
private boolean dirty;
- private Set<String> collectionsToWrite = new HashSet<>();
+ private Set<String> collectionsToWrite = ConcurrentHashMap.newKeySet();
protected final ReentrantLock ourLock = new ReentrantLock(true);
protected final ReentrantLock writeLock = new ReentrantLock(true);
@@ -88,7 +87,9 @@ public class ZkStateWriter {
public void enqueueUpdate(ClusterState clusterState, ZkNodeProps message, boolean stateUpdate) throws Exception {
if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={}", stateUpdate);
+ //log.info("Get our write lock for enq");
ourLock.lock();
+ //log.info("Got our write lock for enq");
try {
AtomicBoolean changed = new AtomicBoolean();
@@ -143,26 +144,51 @@ public class ZkStateWriter {
}
switch (overseerAction) {
case STATE:
- log.info("state cmd");
- String collection = message.getStr("collection");
- DocCollection docColl = cs.getCollectionOrNull(collection);
- if (docColl != null) {
- Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
- if (replica != null) {
- Replica.State state = Replica.State.getState((String) message.get(ZkStateReader.STATE_PROP));
- log.info("set state {} {}", state, replica);
- if (state != replica.getState()) {
- replica.setState(state);
- changed.set(true);
- collectionsToWrite.add(collection);
+ // log.info("state cmd {}", message);
+ message.getProperties().remove("operation");
+
+ for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+ String core = entry.getKey();
+ String collectionAndStateString = (String) entry.getValue();
+ String[] collectionAndState = collectionAndStateString.split(",");
+ String collection = collectionAndState[0];
+ String setState = collectionAndState[1];
+ DocCollection docColl = cs.getCollectionOrNull(collection);
+ if (docColl != null) {
+ Replica replica = docColl.getReplica(core);
+ if (replica != null) {
+ if (setState.equals("leader")) {
+ log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
+ Slice slice = docColl.getSlice(replica.getSlice());
+ slice.setLeader(replica);
+ replica.setState(Replica.State.ACTIVE);
+ replica.getProperties().put("leader", "true");
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica r : replicas) {
+ if (r != replica) {
+ r.getProperties().remove("leader");
+ }
+ }
+ changed.set(true);
+ collectionsToWrite.add(collection);
+ } else {
+
+ Replica.State state = Replica.State.getState(setState);
+
+ // log.info("set state {} {}", state, replica);
+ replica.setState(state);
+ changed.set(true);
+ collectionsToWrite.add(collection);
+ }
}
}
}
+
break;
case LEADER:
- log.info("leader cmd");
- collection = message.getStr("collection");
- docColl = cs.getCollectionOrNull(collection);
+ // log.info("leader cmd");
+ String collection = message.getStr("collection");
+ DocCollection docColl = cs.getCollectionOrNull(collection);
if (docColl != null) {
Slice slice = docColl.getSlice(message.getStr("shard"));
if (slice != null) {
@@ -199,11 +225,9 @@ public class ZkStateWriter {
Slice slice = docColl.getSlice(entry.getKey());
if (slice != null) {
Slice.State state = Slice.State.getState((String) entry.getValue());
- if (slice.getState() != state) {
- slice.setState(state);
- changed.set(true);
- collectionsToWrite.add(collection);
- }
+ slice.setState(state);
+ changed.set(true);
+ collectionsToWrite.add(collection);
}
}
}
@@ -253,13 +277,15 @@ public class ZkStateWriter {
*/
public void writePendingUpdates() {
- writeLock.lock();
- try {
+ // writeLock.lock();
+ // try {
+ // log.info("Get our write lock");
ourLock.lock();
try {
- if (!dirty) {
- return;
- }
+ // log.info("Got our write lock");
+// if (!dirty) {
+// return;
+// }
if (log.isDebugEnabled()) {
log.debug("writePendingUpdates {}", cs);
@@ -270,61 +296,31 @@ public class ZkStateWriter {
failedUpdates.clear();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException.get());
}
- } finally {
- ourLock.unlock();
- }
+// } finally {
+// ourLock.unlock();
+// }
// wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
- cs.forEachCollection(collection -> {
- if (collectionsToWrite.contains(collection.getName())) {
- Integer v = null;
- try {
- //System.out.println("waiting to see state " + prevVersion);
- v = trackVersions.get(collection.getName());
- if (v == null) v = 0;
- if (v == 0) return;
- Integer version = v;
- try {
- log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
- reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
- if (col == null) {
- return true;
- }
- // if (col != null) {
- // log.info("the version " + col.getZNodeVersion());
- // }
- if (col != null && col.getZNodeVersion() >= version) {
- if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
- // System.out.println("found the version");
- return true;
- }
- return false;
- });
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
- } catch (TimeoutException e) {
- log.warn("Timeout waiting to see written cluster state come back " + v);
- }
- }
-
- });
+ // waitForStateWePublishedToComeBack();
- ourLock.lock();
+ // ourLock.lock();
AtomicInteger lastVersion = new AtomicInteger();
- try {
+ //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
+ //try {
cs.forEachCollection(collection -> {
+ // log.info("check collection {}", collection);
if (collectionsToWrite.contains(collection.getName())) {
+ // log.info("process collection {}", collection);
String name = collection.getName();
String path = ZkStateReader.getCollectionPath(collection.getName());
+ // log.info("process collection {} path {}", collection.getName(), path);
+
if (log.isDebugEnabled()) log.debug("process {}", collection);
Stat stat = new Stat();
- boolean success = false;
try {
-
+ // log.info("get data for {}", name);
byte[] data = Utils.toJSON(singletonMap(name, collection));
-
+ // log.info("got data for {} {}", name, data.length);
if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
try {
@@ -373,14 +369,15 @@ public class ZkStateWriter {
}
});
+ //log.info("Done with successful cluster write out");
dirty = false;
collectionsToWrite.clear();
} finally {
ourLock.unlock();
}
- } finally {
- writeLock.unlock();
- }
+// } finally {
+// writeLock.unlock();
+// }
// nocommit - harden against failures and exceptions
// if (log.isDebugEnabled()) {
@@ -389,6 +386,44 @@ public class ZkStateWriter {
}
+ private void waitForStateWePublishedToComeBack() {
+ cs.forEachCollection(collection -> {
+ if (collectionsToWrite.contains(collection.getName())) {
+ Integer v = null;
+ try {
+ //System.out.println("waiting to see state " + prevVersion);
+ v = trackVersions.get(collection.getName());
+ if (v == null) v = 0;
+ if (v == 0) return;
+ Integer version = v;
+ try {
+ log.info("wait to see last published version for collection {} {}", collection.getName(), v);
+ reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
+ if (col == null) {
+ return true;
+ }
+ // if (col != null) {
+ // log.info("the version " + col.getZNodeVersion());
+ // }
+ if (col != null && col.getZNodeVersion() >= version) {
+ if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
+ // System.out.println("found the version");
+ return true;
+ }
+ return false;
+ });
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ } catch (TimeoutException e) {
+ log.warn("Timeout waiting to see written cluster state come back " + v);
+ }
+ }
+
+ });
+ }
+
public ClusterState getClusterstate(boolean stateUpdate) {
ourLock.lock();
try {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index ee31f8f..0dbc692 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -55,30 +55,26 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
String collection = params.get("collection");
+ String shard = params.get(ZkStateReader.SHARD_ID_PROP);
+
if (collection == null) {
throw new IllegalArgumentException("collection cannot be null");
}
- String shardId = params.get("shardId");
- String nodeName = params.get("nodeName");
Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
- Boolean checkLive = params.getBool("checkLive");
- Boolean onlyIfLeader = params.getBool("onlyIfLeader");
log.info(
- "Going to wait for core: {}, state: {}, checkLive: {}, onlyIfLeader: {}: params={}",
- cname, waitForState, checkLive, onlyIfLeader, params);
+ "Going to wait for core: {}, state: {}: params={}",
+ cname, waitForState, params);
assert TestInjection.injectPrepRecoveryOpPauseForever();
CoreContainer coreContainer = it.handler.coreContainer;
- // wait long enough for the leader conflict to work itself out plus a little extra
- int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
-
AtomicReference<String> errorMessage = new AtomicReference<>();
+
try {
- coreContainer.getZkController().getZkStateReader().waitForState(collection, conflictWaitMs, TimeUnit.MILLISECONDS, (n, c) -> {
+ coreContainer.getZkController().getZkStateReader().waitForState(collection, 10, TimeUnit.SECONDS, (n, c) -> {
if (c == null) {
log.info("collection not found {}", collection);
return false;
@@ -86,54 +82,15 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
// wait until we are sure the recovering node is ready
// to accept updates
- Replica.State state = null;
- boolean live = false;
final Replica replica = c.getReplica(cname);
if (replica != null) {
if (replica != null) {
- state = replica.getState();
- live = n.contains(nodeName);
-
- try {
- ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, c.getSlice(replica).getName());
- // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
- if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
- // The replica changed its term, then published itself as RECOVERING.
- // This core already see replica as RECOVERING
- // so it is guarantees that a live-fetch will be enough for this core to see max term published
- log.info("refresh shard terms for core {}", cname);
- shardTerms.refreshTerms();
- }
- } catch (NullPointerException e) {
- if (log.isDebugEnabled()) log.debug("No shards found", e);
- // likely deleted shard/collection
- }
- if (log.isInfoEnabled()) {
- log.info(
- "In WaitForState(" + waitForState + "): collection=" + collection +
- ", thisCore=" + cname +
- ", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state
- + ", nodeName=" + nodeName +
- ", core=" + cname
- + ", nodeProps: " + replica); //LOGOK
- }
-
- log.info("replica={} state={} waitForState={}", replica, state, waitForState);
- if (replica != null && (state == waitForState)) {
- if (checkLive == null) {
- log.info("checkLive=false, return true");
- return true;
- } else if (checkLive && live) {
- log.info("checkLive=true live={}, return true", live);
- return true;
- } else if (!checkLive && !live) {
- log.info("checkLive=false live={}, return true", live);
- return true;
- }
+ if (replica.getState() == waitForState) {
+ log.info("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
+ return true;
}
}
}
-
return false;
});
@@ -141,9 +98,24 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
SolrZkClient.checkInterrupted(e);
String error = errorMessage.get();
if (error == null)
- error = "Timeout waiting for collection state.";
+ error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
+ try {
+ ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, shard);
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
+ // The replica changed its term, then published itself as RECOVERING.
+ // This core already see replica as RECOVERING
+ // so it is guarantees that a live-fetch will be enough for this core to see max term published
+ log.info("refresh shard terms for core {}", cname);
+ shardTerms.refreshTerms();
+ }
+ } catch (NullPointerException e) {
+ if (log.isDebugEnabled()) log.debug("No shards found", e);
+ // likely deleted shard/collection
+ }
+
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
index a18af01..b294e5d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
@@ -272,7 +272,7 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
params.set( "collection", collectionName);
}
if (shardId != null) {
- params.set( "shardId", shardId);
+ params.set( ZkStateReader.SHARD_ID_PROP, shardId);
}
return params;