You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 03:42:44 UTC
[lucene-solr] branch reference_impl_dev updated: @1158 Simplify
state update writing.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new ec23836 @1158 Simplify state update writing.
ec23836 is described below
commit ec23836f25c4eddd8fcae10b8f975a404a4dcb79
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Nov 10 21:40:05 2020 -0600
@1158 Simplify state update writing.
---
.../solr/cloud/OverseerTaskExecutorTask.java | 4 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 60 +-----
.../solr/cloud/api/collections/MoveReplicaCmd.java | 2 +-
.../OverseerCollectionMessageHandler.java | 4 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 213 +++++++++++----------
.../apache/solr/common/cloud/ZkStateReader.java | 4 +-
7 files changed, 124 insertions(+), 165 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index 8e95c17..2277034 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -73,10 +73,10 @@ public class OverseerTaskExecutorTask implements Runnable {
ClusterState cs = zkStateWriter.getClusterstate(true);
log.info("Process message {} {}", message, operation);
- ClusterState newClusterState = processMessage(message, operation, cs);
+ // ClusterState newClusterState = processMessage(message, operation, cs);
log.info("Enqueue message {}", operation);
- zkStateWriter.enqueueUpdate(newClusterState, true);
+ zkStateWriter.enqueueUpdate(null, message, true);
if (log.isDebugEnabled()) log.debug("State update consumed from queue {}", message);
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d6ceeb3..1f88d0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -625,6 +625,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
if (replicaType == Replica.Type.TLOG) {
+ log.info("Stopping replication from leader for {}", coreName);
zkController.stopReplicationFromLeader(coreName);
}
@@ -635,16 +636,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// though
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- final Replica leader = getLeader(ourUrl, this.coreDescriptor, true);
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- boolean isLeader = leader.getCoreUrl().equals(ourUrl);
- if (isLeader && !cloudDesc.isLeader() && leader.getState().equals(Replica.State.ACTIVE)) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
- }
+ final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 15000);
log.info("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
@@ -703,6 +695,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
+ log.info("PeerSync was successful, commit to force open a new searcher");
// force open a new searcher
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
req.close();
@@ -734,23 +727,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Replication Recovery was successful.");
successfulRecovery = true;
} catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e, true);
+ log.info("Interrupted or already closed, bailing on recovery");
return;
- } catch (NullPointerException e) {
- if (log.isDebugEnabled()) log.debug("NullPointerException", e);
- break;
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover", e);
}
} catch (Exception e) {
- if (core.getCoreContainer().isShutDown()) {
- break;
- }
SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
} finally {
if (successfulRecovery) {
- log.info("Registering as Active after recovery.");
+ log.info("Registering as Active after recovery {}", coreName);
try {
if (replicaType == Replica.Type.TLOG) {
zkController.startReplicationFromLeader(coreName, true);
@@ -768,6 +755,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
close = true;
recoveryListener.recovered();
}
+ } else {
+ log.info("Recovery was not sucessful, will not register as ACTIVE {}", coreName);
}
}
@@ -829,39 +818,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Finished recovery process, successful=[{}]", successfulRecovery);
}
- private final Replica getLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
- throws Exception {
- int numTried = 0;
- while (true) {
- CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
- if (!isClosed() && mayPutReplicaAsDown && numTried == 1 && docCollection.getReplica(coreDesc.getName()).getState() == Replica.State.ACTIVE) {
- // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
- // zkController.publish(coreDesc, Replica.State.DOWN);
- // TODO: We should be in recovery and ignored by queries?
- }
- numTried++;
-
- if (numTried > 3) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get leader");
- // instead of hammering on the leader,
- // let recovery process continue normally
- }
-
- Replica leaderReplica = null;
-
- try {
- leaderReplica = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
- } catch (SolrException e) {
- Thread.sleep(500);
- log.info("Could not find leader, looping again ...", e);
- continue;
- }
-
- return leaderReplica;
- }
- }
-
public static Runnable testing_beforeReplayBufferingUpdates;
final private void replay(SolrCore core)
@@ -926,7 +882,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
final public boolean isClosed() {
- return close || cc.isShutDown();
+ return close;
}
final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index 20bcfd9..691e267 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -290,7 +290,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
AddReplicaCmd.Response response = ocmh.addReplicaWithResp(clusterState, addReplicasProps, addResult);
- ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState, false);
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState, null,false);
ocmh.overseer.writePendingUpdates();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 27a6b9f..b901efc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -271,7 +271,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
if (responce.clusterState != null) {
- overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, false);
+ overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, null, false);
overseer.writePendingUpdates();
}
@@ -281,7 +281,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
AddReplicaCmd.Response resp = responce.asyncFinalRunner.call();
if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", resp.clusterState);
if (resp.clusterState != null) {
- overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, false);
+ overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, null,false);
overseer.writePendingUpdates();
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index a2b9b22..0634699 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -348,7 +348,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// firstReplicaFutures.add(future);
}
- ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState, false);
+ ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState, null,false);
ocmh.overseer.writePendingUpdates();
firstReplicaFutures.forEach(future -> {
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index eeaecaf..da787b9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -37,9 +39,11 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.BoundedTreeSet;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -81,16 +85,17 @@ public class ZkStateWriter {
cs = zkStateReader.getClusterState();
}
- public void enqueueUpdate(ClusterState clusterState, boolean stateUpdate) throws Exception {
+ public void enqueueUpdate(ClusterState clusterState, ZkNodeProps message, boolean stateUpdate) throws Exception {
if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={}", stateUpdate);
ourLock.lock();
try {
AtomicBoolean changed = new AtomicBoolean();
- if (clusterState == null) {
- throw new NullPointerException("clusterState cannot be null");
- }
+
if (!stateUpdate) {
+ if (clusterState == null) {
+ throw new NullPointerException("clusterState cannot be null");
+ }
changed.set(true);
clusterState.forEachCollection(collection -> {
DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
@@ -100,7 +105,12 @@ public class ZkStateWriter {
Slice currentSlice = currentCollection.getSlice(slice.getName());
if (currentSlice != null) {
slice.setState(currentSlice.getState());
+ Replica leader = currentSlice.getLeader();
slice.setLeader(currentSlice.getLeader());
+ if (leader != null) {
+ leader.setState(Replica.State.ACTIVE);
+ leader.getProperties().put("leader", "true");
+ }
}
}
@@ -126,112 +136,97 @@ public class ZkStateWriter {
this.cs = clusterState;
} else {
- clusterState.forEachCollection(newCollection -> {
-
- DocCollection currentCollection = cs.getCollectionOrNull(newCollection.getName());
- if (currentCollection == null) {
- log.error("Could not update state for non existing collection {}", newCollection.getName());
- return;
- }
- for (Slice slice : newCollection) {
- Slice currentSlice = currentCollection.getSlice(slice.getName());
- if (currentSlice != null) {
- if (log.isDebugEnabled()) log.debug("set slice state to {} {} leader={}", slice.getName(), slice.getState(), slice.getLeader());
- Replica leader = slice.getLeader();
- if (leader != null) {
- currentSlice.setState(slice.getState());
- currentSlice.setLeader(slice.getLeader());
- currentSlice.getLeader().getProperties().put("leader", "true");
- currentSlice.getLeader().getProperties().put("state", Replica.State.ACTIVE.toString());
- changed.set(true);
+ final String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ OverseerAction overseerAction = OverseerAction.get(operation);
+ if (overseerAction == null) {
+ throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
+ }
+ switch (overseerAction) {
+ case STATE:
+ log.info("state cmd");
+ String collection = message.getStr("collection");
+ DocCollection docColl = cs.getCollectionOrNull(collection);
+ if (docColl != null) {
+ Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
+ if (replica != null) {
+ Replica.State state = Replica.State.getState((String) message.get(ZkStateReader.STATE_PROP));
+ log.info("set state {} {}", state, replica);
+ if (state != replica.getState()) {
+ replica.setState(state);
+ changed.set(true);
+ collectionsToWrite.add(collection);
+ }
}
}
- for (Replica replica : slice) {
- Replica currentReplica = currentCollection.getReplica(replica.getName());
- if (currentReplica != null) {
- if (log.isDebugEnabled()) log.debug("set replica state to {} isLeader={}", replica.getState(), replica.getProperty("leader"));
- currentReplica.setState(replica.getState());
- String leader = replica.getProperty("leader");
- if (leader != null || slice.getLeader() != null && replica.getName().equals(slice.getLeader().getName())) {
- currentReplica.getProperties().put("leader", "true");
- currentReplica.getProperties().put("state", Replica.State.ACTIVE.toString());
+ break;
+ case LEADER:
+ log.info("leader cmd");
+ collection = message.getStr("collection");
+ docColl = cs.getCollectionOrNull(collection);
+ if (docColl != null) {
+ Slice slice = docColl.getSlice(message.getStr("shard"));
+ if (slice != null) {
+ Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
+ if (replica != null) {
+ log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
+ slice.setLeader(replica);
+ replica.setState(Replica.State.ACTIVE);
+ replica.getProperties().put("leader", "true");
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica r : replicas) {
+ if (r != replica) {
+ r.getProperties().remove("leader");
+ }
+ }
+ changed.set(true);
+ collectionsToWrite.add(collection);
}
- // nocommit
- // else if (leader == null) {
- // currentReplica.getProperties().remove("leader");
- // }
-
- if (slice.getLeader() != null && slice.getLeader().getName().equals(replica.getName())) {
- currentReplica.getProperties().put("leader", "true");
- currentReplica.getProperties().put("state", Replica.State.ACTIVE.toString());
+ }
+ }
+ break;
+// case ADDROUTINGRULE:
+// return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
+// case REMOVEROUTINGRULE:
+// return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
+ case UPDATESHARDSTATE:
+ collection = message.getStr("collection");
+ message.getProperties().remove("collection");
+ message.getProperties().remove("operation");
+
+ docColl = cs.getCollectionOrNull(collection);
+ if (docColl != null) {
+ for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+ Slice slice = docColl.getSlice(entry.getKey());
+ if (slice != null) {
+ Slice.State state = Slice.State.getState((String) entry.getValue());
+ if (slice.getState() != state) {
+ slice.setState(state);
+ changed.set(true);
+ collectionsToWrite.add(collection);
+ }
+ }
}
-
- Replica thereplica = cs.getCollectionOrNull(newCollection.getName()).getReplica(replica.getName());
- if (log.isDebugEnabled()) log.debug("Check states nreplica={} ceplica={}", replica.getState(), thereplica.getState());
-
- if (replica.getState() == Replica.State.ACTIVE) {
- if (log.isDebugEnabled()) log.debug("Setting replica to active state leader={} state={} col={}", leader, cs, currentCollection);
+ }
+ break;
+ case DOWNNODE:
+ collection = message.getStr("collection");
+ docColl = cs.getCollectionOrNull(collection);
+ if (docColl != null) {
+ List<Replica> replicas = docColl.getReplicas();
+ for (Replica replica : replicas) {
+ if (replica.getState() != Replica.State.DOWN) {
+ replica.setState(Replica.State.DOWN);
+ changed.set(true);
+ collectionsToWrite.add(collection);
}
-
- changed.set(true); // nocommit - only if really changed
}
-
}
- }
- });
+ break;
+ default:
+ throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
- cs.forEachCollection(collection -> {
- Object removed = collection.getProperties().remove("replicationFactor");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
- removed = collection.getProperties().remove("pullReplicas");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
- removed = collection.getProperties().remove("maxShardsPerNode");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
- removed = collection.getProperties().remove("nrtReplicas");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
- removed = collection.getProperties().remove("tlogReplicas");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
-
- for (Slice slice : collection) {
- Replica leader = slice.getLeader();
- if (leader != null && leader.getState() != Replica.State.ACTIVE) {
- slice.setLeader(null);
- leader.getProperties().remove("leader");
- changed.set(true);
- }
-
- for (Replica replica : slice) {
- String isLeader = replica.getProperty("leader");
- if (log.isDebugEnabled()) log.debug("isleader={} slice={} state={} sliceLeader={}", isLeader, slice.getName(), slice.getState(), slice.getLeader());
- if (Boolean.parseBoolean(isLeader) && replica.getState() != Replica.State.ACTIVE) {
- if (log.isDebugEnabled()) log.debug("clear leader isleader={} slice={} state={} sliceLeader={}", isLeader, slice.getName(), slice.getState(), slice.getLeader());
- replica.getProperties().remove("leader");
- changed.set(true); // nocommit - only if really changed
- }
-
- removed = replica.getProperties().remove("numShards");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
- removed = replica.getProperties().remove("base_url");
- if (removed != null) {
- changed.set(true); // nocommit - only if really changed
- }
+ }
- }
- }
- });
- collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
}
if (stateUpdate) {
@@ -338,12 +333,20 @@ public class ZkStateWriter {
Integer version;
if (v != null) {
version = v;
+ lastVersion.set(version);
+ reader.getZkClient().setData(path, data, version, true);
} else {
- version = reader.getZkClient().exists(path, null).getVersion();
+ Stat existsStat = reader.getZkClient().exists(path, null);
+ if (existsStat == null) {
+ version = 0;
+ lastVersion.set(version);
+ reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
+ } else {
+ version = stat.getVersion();
+ lastVersion.set(version);
+ reader.getZkClient().setData(path, data, version, true);
+ }
}
- lastVersion.set(version);
- reader.getZkClient().setData(path, data, version, true);
-
trackVersions.put(collection.getName(), version + 1);
} catch (KeeperException.NoNodeException e) {
if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 2faa060..8435f0a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -222,7 +222,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
final Set<T> stateWatchers = ConcurrentHashMap.newKeySet();
public boolean canBeRemoved() {
- return coreRefCount.get() <=0 && stateWatchers.size() <= 0;
+ return coreRefCount.get() < 0 && stateWatchers.size() <= 0;
}
}
@@ -1916,7 +1916,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
private boolean updateWatchedCollection(String coll, DocCollection newState) {
if (newState == null) {
- log.debug("Removing cached collection state for [{}]", coll);
+ if (log.isDebugEnabled()) log.debug("Removing cached collection state for [{}]", coll);
watchedCollectionStates.remove(coll);
return true;
}