You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2019/03/19 17:26:44 UTC
[lucene-solr] branch master updated: SOLR-12955: Refactored
DistributedUpdateProcessor to put SolrCloud specifics into a subclass
Closes #528
This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 5b7866b SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud specifics into a subclass Closes #528
5b7866b is described below
commit 5b7866b0851eff66cb7e929beef5249e3c72ac36
Author: Bar Rotstein <ba...@gmail.com>
AuthorDate: Tue Mar 19 13:26:31 2019 -0400
SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud specifics into a subclass
Closes #528
---
solr/CHANGES.txt | 4 +
.../solr/update/processor/CdcrUpdateProcessor.java | 2 +-
.../processor/DistributedUpdateProcessor.java | 1307 ++------------------
.../DistributedUpdateProcessorFactory.java | 9 +-
.../processor/DistributedZkUpdateProcessor.java | 1235 ++++++++++++++++++
.../DocBasedVersionConstraintsProcessor.java | 3 +-
.../SkipExistingDocumentsProcessorFactory.java | 3 +-
.../AtomicUpdateProcessorFactoryTest.java | 2 +-
.../processor/DistributedUpdateProcessorTest.java | 4 +-
.../src/java/org/apache/solr/SolrTestCaseJ4.java | 11 +
10 files changed, 1374 insertions(+), 1206 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f90bc3b..e9661ff 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -128,6 +128,7 @@ Bug Fixes
Improvements
----------------------
+
* SOLR-12999: Index replication could delete segments before downloading segments from master if there is not enough
disk space (noble)
@@ -166,6 +167,9 @@ Other Changes
* SOLR-8033: Remove debug if branch in HdfsTransactionLog (Kevin Risden)
+* SOLR-12955: Refactored DistributedUpdateProcessor to put SolrCloud functionality into a subclass.
+ (Bar Rotstein, David Smiley)
+
================== 8.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
index ee45467..fe13a91 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
* by the target cluster.
* </p>
*/
-public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
+public class CdcrUpdateProcessor extends DistributedZkUpdateProcessor {
public static final String CDCR_UPDATE = "cdcr.update";
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index a5d2898..50660cb 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -16,23 +16,11 @@
*/
package org.apache.solr.update.processor;
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.util.BytesRef;
@@ -41,38 +29,20 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
-import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.ZkShardTerms;
-import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.RoutingRule;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.Slice.State;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@@ -80,14 +50,9 @@ import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.MergeIndexesCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
-import org.apache.solr.update.SolrCmdDistributor.StdNode;
-import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateShardHandler;
@@ -95,10 +60,12 @@ import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@@ -109,7 +76,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
public static final String DISTRIB_FROM = "distrib.from";
public static final String DISTRIB_INPLACE_PREVVERSION = "distrib.inplace.prevversion";
- private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
+ protected static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
@@ -150,9 +117,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// used to assert we don't call finish more than once, see finish()
private boolean finished = false;
- private final SolrQueryRequest req;
- private final SolrQueryResponse rsp;
- private final UpdateRequestProcessor next;
+ protected final SolrQueryRequest req;
+ protected final SolrQueryResponse rsp;
private final AtomicUpdateDocumentMerger docMerger;
private final UpdateLog ulog;
@@ -167,47 +133,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final SchemaField idField;
- private SolrCmdDistributor cmdDistrib;
-
- private final boolean zkEnabled;
-
- private final CloudDescriptor cloudDesc;
- private final String collection;
- private final ZkController zkController;
-
// these are setup at the start of each request processing
// method in this update processor
- private boolean isLeader = true;
- private boolean forwardToLeader = false;
- private boolean isSubShardLeader = false;
- private List<Node> nodes;
- private Set<String> skippedCoreNodeNames;
- private boolean isIndexChanged = false;
-
- private boolean readOnlyCollection = false;
+ protected boolean isLeader = true;
+ protected boolean forwardToLeader = false;
+ protected boolean isSubShardLeader = false;
+ protected boolean isIndexChanged = false;
/**
* Number of times requests forwarded to some other shard's leader can be retried
*/
- private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT;
+ protected final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT;
/**
* Number of times requests from leaders to followers can be retried
*/
- private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
-
- private UpdateCommand updateCommand; // the current command this processor is working on.
+ protected final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
- //used for keeping track of replicas that have processed an add/update from the leader
- private RollupRequestReplicationTracker rollupReplicationTracker = null;
- private LeaderRequestReplicationTracker leaderReplicationTracker = null;
+ protected UpdateCommand updateCommand; // the current command this processor is working on.
- // should we clone the document before sending it to replicas?
- // this is set to true in the constructor if the next processors in the chain
- // are custom and may modify the SolrInputDocument racing with its serialization for replication
- private final boolean cloneRequiredOnLeader;
- private final Replica.Type replicaType;
+ protected final Replica.Type replicaType;
- public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp,
+ UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
@@ -215,12 +162,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
* @lucene.experimental
*/
public DistributedUpdateProcessor(SolrQueryRequest req,
- SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger, UpdateRequestProcessor next) {
+ SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger,
+ UpdateRequestProcessor next) {
super(next);
this.rsp = rsp;
- this.next = next;
this.docMerger = docMerger;
this.idField = req.getSchema().getUniqueKeyField();
+ this.req = req;
+ this.replicaType = computeReplicaType();
// version init
this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
@@ -231,473 +180,32 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: better way to get the response, or pass back info to it?
// SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null;
- this.req = req;
-
// this should always be used - see filterParams
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD,
UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS);
- CoreContainer cc = req.getCore().getCoreContainer();
-
- this.zkEnabled = cc.isZooKeeperAware();
- zkController = cc.getZkController();
- if (zkEnabled) {
- cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
- }
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
- cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
-
- if (cloudDesc != null) {
- collection = cloudDesc.getCollectionName();
- replicaType = cloudDesc.getReplicaType();
- DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
- if (coll != null) {
- // check readOnly property in coll state
- readOnlyCollection = coll.isReadOnly();
- }
- } else {
- collection = null;
- replicaType = Replica.Type.NRT;
- }
-
- boolean shouldClone = false;
- UpdateRequestProcessor nextInChain = next;
- while (nextInChain != null) {
- Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass();
- if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class
- && klass != RunUpdateProcessor.class
- && klass != TolerantUpdateProcessor.class) {
- shouldClone = true;
- break;
- }
- nextInChain = nextInChain.next;
- }
- cloneRequiredOnLeader = shouldClone;
- }
-
- private boolean isReadOnly() {
- return readOnlyCollection || req.getCore().readOnly;
- }
-
- private List<Node> setupRequest(String id, SolrInputDocument doc) {
- return setupRequest(id, doc, null);
- }
-
- private List<Node> setupRequest(String id, SolrInputDocument doc, String route) {
- // if we are in zk mode...
- if (!zkEnabled) {
- return null;
- }
-
- assert TestInjection.injectUpdateRandomPause();
-
- if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
- isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
- forwardToLeader = false;
- return null;
- }
-
- ClusterState cstate = zkController.getClusterState();
- DocCollection coll = cstate.getCollection(collection);
- Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
-
- if (slice == null) {
- // No slice found. Most strict routers will have already thrown an exception, so a null return is
- // a signal to use the slice of this core.
- // TODO: what if this core is not in the targeted collection?
- String shardId = cloudDesc.getShardId();
- slice = coll.getSlice(shardId);
- if (slice == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
- }
- }
-
- DistribPhase phase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
-
- if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
- if (cloudDesc.isLeader()) {
- // locally we think we are leader but the request says it came FROMLEADER
- // that could indicate a problem, let the full logic below figure it out
- } else {
-
- assert TestInjection.injectFailReplicaRequests();
-
- isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
- forwardToLeader = false;
- return null;
- }
- }
-
- String shardId = slice.getName();
-
- try {
- // Not equivalent to getLeaderProps, which retries to find a leader.
- // Replica leader = slice.getLeader();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
- isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
-
- if (!isLeader) {
- isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
- if (isSubShardLeader) {
- shardId = cloudDesc.getShardId();
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
- }
- }
-
- doDefensiveChecks(phase);
-
- // if request is coming from another collection then we want it to be sent to all replicas
- // even if its phase is FROMLEADER
- String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
-
- if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
- // we are coming from the leader, just go local - add no urls
- forwardToLeader = false;
- return null;
- } else if (isLeader || isSubShardLeader) {
- // that means I want to forward onto my replicas...
- // so get the replicas...
- forwardToLeader = false;
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
- String leaderCoreNodeName = leaderReplica.getName();
- List<Replica> replicas = clusterState.getCollection(collection)
- .getSlice(shardId)
- .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
- if (replicas.isEmpty()) {
- return null;
- }
-
- // check for test param that lets us miss replicas
- String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
- Set<String> skipListSet = null;
- if (skipList != null) {
- skipListSet = new HashSet<>(skipList.length);
- skipListSet.addAll(Arrays.asList(skipList));
- log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
- }
-
- List<Node> nodes = new ArrayList<>(replicas.size());
- skippedCoreNodeNames = new HashSet<>();
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
- for (Replica replica: replicas) {
- String coreNodeName = replica.getName();
- if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
- log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
- } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
- log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
- skippedCoreNodeNames.add(replica.getName());
- } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
- skippedCoreNodeNames.add(replica.getName());
- } else {
- nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
- }
- }
- return nodes;
-
- } else {
- // I need to forward on to the leader...
- forwardToLeader = true;
- return Collections.singletonList(
- new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward));
- }
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
- }
- }
-
- /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
- private boolean couldIbeSubShardLeader(DocCollection coll) {
- // Could I be the leader of a shard in "construction/recovery" state?
- String myShardId = cloudDesc.getShardId();
- Slice mySlice = coll.getSlice(myShardId);
- State state = mySlice.getState();
- return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY;
- }
-
- /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
- private boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException {
- // Am I the leader of a shard in "construction/recovery" state?
- String myShardId = cloudDesc.getShardId();
- Slice mySlice = coll.getSlice(myShardId);
- final State state = mySlice.getState();
- if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
- Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
- boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName());
- if (amILeader) {
- // Does the document belong to my hash range as well?
- DocRouter.Range myRange = mySlice.getRange();
- if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
- if (parentSlice != null) {
- boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange());
- return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll);
- } else {
- // delete by query case -- as long as I am a sub shard leader we're fine
- return true;
- }
- }
- }
- return false;
- }
-
- private List<Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
- String leaderCoreNodeName = leaderReplica.getName();
- List<Replica> replicas = clusterState.getCollection(collection)
- .getSlice(shardId)
- .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
- if (replicas.isEmpty()) {
- return null;
- }
-
- // check for test param that lets us miss replicas
- String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
- Set<String> skipListSet = null;
- if (skipList != null) {
- skipListSet = new HashSet<>(skipList.length);
- skipListSet.addAll(Arrays.asList(skipList));
- log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
- }
-
- List<Node> nodes = new ArrayList<>(replicas.size());
- skippedCoreNodeNames = new HashSet<>();
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
- for (Replica replica : replicas) {
- String coreNodeName = replica.getName();
- if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
- log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
- } else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
- log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
- skippedCoreNodeNames.add(replica.getName());
- } else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
- || replica.getState() == Replica.State.DOWN) {
- skippedCoreNodeNames.add(replica.getName());
- } else {
- nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
- }
- }
- return nodes;
- }
-
- /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
- private List<Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) {
- Collection<Slice> allSlices = coll.getSlices();
- List<Node> nodes = null;
- for (Slice aslice : allSlices) {
- final Slice.State state = aslice.getState();
- if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
- DocRouter.Range myRange = coll.getSlice(shardId).getRange();
- if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
- boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
- if (isSubset &&
- (docId == null // in case of deletes
- || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
- Replica sliceLeader = aslice.getLeader();
- // slice leader can be null because node/shard is created zk before leader election
- if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
- if (nodes == null) nodes = new ArrayList<>();
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
- nodes.add(new StdNode(nodeProps, coll.getName(), aslice.getName()));
- }
- }
- }
- }
- return nodes;
- }
-
- /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */
- private List<Node> getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) {
- DocRouter router = coll.getRouter();
- List<Node> nodes = null;
- if (router instanceof CompositeIdRouter) {
- CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
- String myShardId = cloudDesc.getShardId();
- Slice slice = coll.getSlice(myShardId);
- Map<String, RoutingRule> routingRules = slice.getRoutingRules();
- if (routingRules != null) {
-
- // delete by query case
- if (id == null) {
- for (Entry<String, RoutingRule> entry : routingRules.entrySet()) {
- String targetCollectionName = entry.getValue().getTargetCollectionName();
- final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName);
- if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
- final Slice[] activeSlices = docCollection.getActiveSlicesArr();
- Slice any = activeSlices[0];
- if (nodes == null) nodes = new ArrayList<>();
- nodes.add(new StdNode(new ZkCoreNodeProps(any.getLeader())));
- }
- }
- return nodes;
- }
-
- String routeKey = SolrIndexSplitter.getRouteKey(id);
- if (routeKey != null) {
- RoutingRule rule = routingRules.get(routeKey + "!");
- if (rule != null) {
- if (! rule.isExpired()) {
- List<DocRouter.Range> ranges = rule.getRouteRanges();
- if (ranges != null && !ranges.isEmpty()) {
- int hash = compositeIdRouter.sliceHash(id, doc, null, coll);
- for (DocRouter.Range range : ranges) {
- if (range.includes(hash)) {
- DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName());
- Collection<Slice> activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl);
- if (activeSlices == null || activeSlices.isEmpty()) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName());
- }
- Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName());
- nodes = new ArrayList<>(1);
- nodes.add(new StdNode(new ZkCoreNodeProps(targetLeader)));
- break;
- }
- }
- }
- } else {
- ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock();
- if (!ruleExpiryLock.isLocked()) {
- try {
- if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
- log.info("Going to expire routing rule");
- try {
- Map<String, Object> map = Utils.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(),
- ZkStateReader.COLLECTION_PROP, collection,
- ZkStateReader.SHARD_ID_PROP, myShardId,
- "routeKey", routeKey + "!");
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
- } catch (KeeperException e) {
- log.warn("Exception while removing routing rule for route key: " + routeKey, e);
- } catch (Exception e) {
- log.error("Exception while removing routing rule for route key: " + routeKey, e);
- } finally {
- ruleExpiryLock.unlock();
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
- }
- }
- return nodes;
}
- private void doDefensiveChecks(DistribPhase phase) {
- boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
- if (isReplayOrPeersync) return;
-
- String from = req.getParams().get(DISTRIB_FROM);
- ClusterState clusterState = zkController.getClusterState();
-
- DocCollection docCollection = clusterState.getCollection(collection);
- Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
- boolean localIsLeader = cloudDesc.isLeader();
- if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
- String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
- if (fromShard != null) {
- if (mySlice.getState() == Slice.State.ACTIVE) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Request says it is coming from parent shard leader but we are in active state");
- }
- // shard splitting case -- check ranges to see if we are a sub-shard
- Slice fromSlice = docCollection.getSlice(fromShard);
- DocRouter.Range parentRange = fromSlice.getRange();
- if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
- if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Request says it is coming from parent shard leader but parent hash range is not superset of my range");
- }
- } else {
- String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
- if (fromCollection == null) {
- log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
- SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
- solrExc.setMetadata("cause", "LeaderChanged");
- throw solrExc;
- }
- }
- }
-
- int count = 0;
- while (((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) && count < 5) {
- count++;
- // re-getting localIsLeader since we published to ZK first before setting localIsLeader value
- localIsLeader = cloudDesc.isLeader();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) {
- log.error("ClusterState says we are the leader, but locally we don't think so");
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "ClusterState says we are the leader (" + zkController.getBaseUrl()
- + "/" + req.getCore().getName() + "), but locally we don't think so. Request came from " + from);
- }
+ /**
+ *
+ * @return the replica type of the collection.
+ */
+ protected Replica.Type computeReplicaType() {
+ return Replica.Type.NRT;
}
-
- // used for deleteByQuery to get the list of nodes this leader should forward to
- private List<Node> setupRequestForDBQ() {
- List<Node> nodes = null;
- String shardId = cloudDesc.getShardId();
-
- try {
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
- isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
-
- // TODO: what if we are no longer the leader?
-
- forwardToLeader = false;
- List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- if (replicaProps != null) {
- nodes = new ArrayList<>(replicaProps.size());
- for (ZkCoreNodeProps props : replicaProps) {
- nodes.add(new StdNode(props, collection, shardId));
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
-
- return nodes;
+ boolean isLeader() {
+ return isLeader;
}
-
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
- if (isReadOnly()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
- }
-
- updateCommand = cmd;
-
- if (zkEnabled) {
- zkCheck();
- nodes = setupRequest(cmd.getHashableId(), cmd.getSolrInputDocument());
- } else {
- isLeader = getNonZkLeaderAssumption(req);
- }
-
- // check if client has requested minimum replication factor information. will set replicationTracker to null if
- // we aren't the leader or subShardLeader
- checkReplicationTracker(cmd);
+ setupRequest(cmd);
// If we were sent a previous version, set this to the AddUpdateCommand (if not already set)
if (!cmd.isInPlaceUpdate()) {
@@ -717,58 +225,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
- if (zkEnabled && isLeader && !isSubShardLeader) {
- DocCollection coll = zkController.getClusterState().getCollection(collection);
- List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument());
- // the list<node> will actually have only one element for an add request
- if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
- cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
- }
- final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument());
- if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- params.set(DISTRIB_FROM_COLLECTION, collection);
- params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
- cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true);
- }
- }
-
- if (nodes != null) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM,
- (isLeader || isSubShardLeader ?
- DistribPhase.FROMLEADER.toString() :
- DistribPhase.TOLEADER.toString()));
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
- // TODO: Kept for rolling upgrades only. Should be removed in Solr 9
- params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
- }
-
- if (cmd.isInPlaceUpdate()) {
- params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
-
- // Use synchronous=true so that a new connection is used, instead
- // of the update being streamed through an existing streaming client.
- // When using a streaming client, the previous update
- // and the current in-place update (that depends on the previous update), if reordered
- // in the stream, can result in the current update being bottled up behind the previous
- // update in the stream and can lead to degraded performance.
- cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker);
- } else {
- cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
- }
- }
+ doDistribAdd(cmd);
// TODO: what to do when no idField?
if (returnVersions && rsp != null && idField != null) {
@@ -788,218 +245,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
- // helper method, processAdd was getting a bit large.
- // Sets replicationTracker = null if we aren't the leader
- // We have two possibilities here:
- //
- // 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker
- // 2> we're a follower: allocat a RollupTracker
- //
- private void checkReplicationTracker(UpdateCommand cmd) {
- if (zkEnabled == false) {
- rollupReplicationTracker = null; // never need one of these in stand-alone
- leaderReplicationTracker = null;
- return;
- }
-
- SolrParams rp = cmd.getReq().getParams();
- String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);
- // Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the
- // course of a batch.
- if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) &&
- rollupReplicationTracker == null) {
- rollupReplicationTracker = new RollupRequestReplicationTracker();
- }
- // If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in
- // a batch we need to use the _same_ leader replication tracker.
- if (isLeader && leaderReplicationTracker == null) {
- leaderReplicationTracker = new LeaderRequestReplicationTracker(
- req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
- }
- }
-
-
- @Override
- protected void doClose() {
- if (cmdDistrib != null) {
- cmdDistrib.close();
- }
- }
-
- // TODO: optionally fail if n replicas are not reached...
- private void doFinish() {
- boolean shouldUpdateTerms = isLeader && isIndexChanged;
- if (shouldUpdateTerms) {
- ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
- if (skippedCoreNodeNames != null) {
- zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames);
- }
- zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
- }
- // TODO: if not a forward and replication req is not specified, we could
- // send in a background thread
-
- cmdDistrib.finish();
- List<Error> errors = cmdDistrib.getErrors();
- // TODO - we may need to tell about more than one error...
-
- List<Error> errorsForClient = new ArrayList<>(errors.size());
- Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
- for (final SolrCmdDistributor.Error error : errors) {
-
- if (error.req.node instanceof ForwardNode) {
- // if it's a forward, any fail is a problem -
- // otherwise we assume things are fine if we got it locally
- // until we start allowing min replication param
- errorsForClient.add(error);
- continue;
- }
-
- // else...
-
- // for now we don't error - we assume if it was added locally, we
- // succeeded
- if (log.isWarnEnabled()) {
- log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
- }
-
- // Since it is not a forward request, for each fail, try to tell them to
- // recover - the doc was already added locally, so it should have been
- // legit
-
- DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
- if (phase != DistribPhase.FROMLEADER)
- continue; // don't have non-leaders try to recovery other nodes
-
- // commits are special -- they can run on any node irrespective of whether it is a leader or not
- // we don't want to run recovery on a node which missed a commit command
- if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null)
- continue;
-
- final String replicaUrl = error.req.node.getUrl();
-
- // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
- String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
- if ("LeaderChanged".equals(cause)) {
- // let's just fail this request and let the client retry? or just call processAdd again?
- log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
- " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
- errorsForClient.add(error);
- continue;
- }
-
- String collection = null;
- String shardId = null;
-
- if (error.req.node instanceof StdNode) {
- StdNode stdNode = (StdNode)error.req.node;
- collection = stdNode.getCollection();
- shardId = stdNode.getShardId();
-
- // before we go setting other replicas to down, make sure we're still the leader!
- String leaderCoreNodeName = null;
- Exception getLeaderExc = null;
- Replica leaderProps = null;
- try {
- leaderProps = zkController.getZkStateReader().getLeader(collection, shardId);
- if (leaderProps != null) {
- leaderCoreNodeName = leaderProps.getName();
- }
- } catch (Exception exc) {
- getLeaderExc = exc;
- }
- if (leaderCoreNodeName == null) {
- log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " +
- "before putting {} into leader-initiated recovery",
- cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc);
- }
-
- List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
- cloudDesc.getShardId(), cloudDesc.getCoreNodeName());
- boolean foundErrorNodeInReplicaList = false;
- if (myReplicas != null) {
- for (ZkCoreNodeProps replicaProp : myReplicas) {
- if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) {
- foundErrorNodeInReplicaList = true;
- break;
- }
- }
- }
-
- if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader
- && foundErrorNodeInReplicaList // we found an error for one of replicas
- && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
- try {
- String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
- // if false, then the node is probably not "live" anymore
- // and we do not need to send a recovery message
- Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
- replicasShouldBeInLowerTerms.add(coreNodeName);
- } catch (Exception exc) {
- Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
- log.error("Leader failed to set replica " +
- error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
- }
- } else {
- // not the leader anymore maybe or the error'd node is not my replica?
- if (!foundErrorNodeInReplicaList) {
- log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
- cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
- "No request recovery command will be sent!");
- if (!shardId.equals(cloudDesc.getShardId())) {
- // some replicas on other shard did not receive the updates (ex: during splitshard),
- // exception must be notified to clients
- errorsForClient.add(error);
- }
- } else {
- log.warn("Core " + cloudDesc.getCoreNodeName() + " is no longer the leader for " + collection + " "
- + shardId + " or we tried to put ourself into LIR, no request recovery command will be sent!");
- }
- }
- }
- }
- if (!replicasShouldBeInLowerTerms.isEmpty()) {
- zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
- .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
- }
- handleReplicationFactor();
- if (0 < errorsForClient.size()) {
- throw new DistributedUpdatesAsyncException(errorsForClient);
- }
- }
-
- /**
- * If necessary, include in the response the achieved replication factor
- */
- @SuppressWarnings("deprecation")
- private void handleReplicationFactor() {
- if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
- int achievedRf = Integer.MAX_VALUE;
-
- if (leaderReplicationTracker != null) {
-
- achievedRf = leaderReplicationTracker.getAchievedRf();
-
- // Transfer this to the rollup tracker if it exists
- if (rollupReplicationTracker != null) {
- rollupReplicationTracker.testAndSetAchievedRf(achievedRf);
- }
- }
-
- // Rollup tracker has accumulated stats.
- if (rollupReplicationTracker != null) {
- achievedRf = rollupReplicationTracker.getAchievedRf();
- }
- if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
- // Unused, but kept for back compatibility. To be removed in Solr 9
- rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT)));
- }
- rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf);
- rollupReplicationTracker = null;
- leaderReplicationTracker = null;
-
- }
+ protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
+ // no-op for derived classes to implement
}
// must be synchronized by bucket
@@ -1217,17 +464,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
-
- SolrInputDocument clonedDoc = null;
- if (willDistrib && cloneRequiredOnLeader) {
- clonedDoc = cmd.solrDoc.deepCopy();
- }
+ SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
- if (willDistrib && cloneRequiredOnLeader) {
+ if (clonedDoc != null) {
cmd.solrDoc = clonedDoc;
}
} finally {
@@ -1244,6 +486,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
+ /**
+ *
+ * @return whether cmd doc should be cloned before localAdd
+ */
+ protected boolean shouldCloneCmdDoc() {
+ return false;
+ }
+
@VisibleForTesting
boolean shouldBufferUpdate(AddUpdateCommand cmd, boolean isReplayOrPeersync, UpdateLog.State state) {
if (state == UpdateLog.State.APPLYING_BUFFERED
@@ -1347,23 +597,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
params.set("onlyIfActive", true);
SolrRequest<SimpleSolrResponse> ur = new GenericSolrRequest(METHOD.GET, "/get", params);
- String leaderUrl = req.getParams().get(DISTRIB_FROM);
-
- if (leaderUrl == null) {
- // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is
- // down or partitioned from us for some reason. Lets force refresh cluster state, and request the
- // leader for the update.
- if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test
- throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader "
- + "failed since we're not in cloud mode.");
- }
- Replica leader;
- try {
- leader = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId());
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
- }
- leaderUrl = leader.getCoreUrl();
+ String leaderUrl = getLeaderUrl(id);
+
+ if(leaderUrl == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id);
}
NamedList<Object> rsp;
@@ -1433,11 +670,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
-
- if (isReadOnly()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
- }
-
+
updateCommand = cmd;
if (!cmd.isDeleteById()) {
@@ -1451,17 +684,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// have any documents specified by those IDs, the request is not forwarded to any other replicas on that shard. Thus
// we have to spoof the replicationTracker and set the achieved rf to the number of active replicas.
//
- private void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
- if (zkEnabled) {
- zkCheck();
- nodes = setupRequest(cmd.getId(), null, cmd.getRoute());
- } else {
- isLeader = getNonZkLeaderAssumption(req);
- }
+ protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
- // check if client has requested minimum replication factor information. will set replicationTracker to null if
- // we aren't the leader or subShardLeader
- checkReplicationTracker(cmd);
+ setupRequest(cmd);
boolean dropCmd = false;
if (!forwardToLeader) {
@@ -1473,45 +698,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
- if (zkEnabled && isLeader && !isSubShardLeader) {
- DocCollection coll = zkController.getClusterState().getCollection(collection);
- List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
- // the list<node> will actually have only one element for an add request
- if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
- cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
- }
-
- final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
- if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- params.set(DISTRIB_FROM_COLLECTION, collection);
- params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
- cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null);
- }
- }
-
- if (nodes != null) {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM,
- (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString()
- : DistribPhase.TOLEADER.toString()));
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
- // TODO: Kept for rolling upgrades only. Remove in Solr 9
- params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
- }
- cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
- }
+ doDistribDeleteById(cmd);
// cmd.getIndexId == null when delete by query
// TODO: what to do when no idField?
@@ -1526,6 +713,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
+ /**
+ * This method can be overridden to tamper with the cmd after the localDeleteById operation
+ * @param cmd the delete command
+ * @throws IOException in case post processing failed
+ */
+ protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
+ // no-op for derived classes to implement
+ }
+
/** @see DistributedUpdateProcessorFactory#addParamToDistributedRequestWhitelist */
@SuppressWarnings("unchecked")
protected ModifiableSolrParams filterParams(SolrParams params) {
@@ -1549,103 +745,24 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
-
+ /**
+ * for implementing classes to setup request data(nodes, replicas)
+ * @param cmd the delete command being processed
+ */
+ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// even in non zk mode, tests simulate updates from a leader
- if(!zkEnabled) {
- isLeader = getNonZkLeaderAssumption(req);
- } else {
- zkCheck();
- }
-
- // NONE: we are the first to receive this deleteByQuery
- // - it must be forwarded to the leader of every shard
- // TO: we are a leader receiving a forwarded deleteByQuery... we must:
- // - block all updates (use VersionInfo)
- // - flush *all* updates going to our replicas
- // - forward the DBQ to our replicas and wait for the response
- // - log + execute the local DBQ
- // FROM: we are a replica receiving a DBQ from our leader
- // - log + execute the local DBQ
- DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
-
- DocCollection coll = zkEnabled
- ? zkController.getClusterState().getCollection(collection) : null;
-
- if (zkEnabled && DistribPhase.NONE == phase) {
- if (rollupReplicationTracker == null) {
- rollupReplicationTracker = new RollupRequestReplicationTracker();
- }
- boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
-
- ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
- outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
- outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- SolrParams params = req.getParams();
- String route = params.get(ShardParams._ROUTE_);
- Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
-
- List<Node> leaders = new ArrayList<>(slices.size());
- for (Slice slice : slices) {
- String sliceName = slice.getName();
- Replica leader;
- try {
- leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
- }
-
- // TODO: What if leaders changed in the meantime?
- // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
-
- // Am I the leader for this slice?
- ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
- String leaderCoreNodeName = leader.getName();
- String coreNodeName = cloudDesc.getCoreNodeName();
- isLeader = coreNodeName.equals(leaderCoreNodeName);
-
- if (isLeader) {
- // don't forward to ourself
- leaderForAnyShard = true;
- } else {
- leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
- }
- }
-
- outParams.remove("commit"); // this will be distributed from the local commit
-
-
- if (params.get(UpdateRequest.MIN_REPFACT) != null) {
- // TODO: Kept this for rolling upgrades. Remove in Solr 9
- outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
- }
- cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
-
- if (!leaderForAnyShard) {
- return;
- }
-
- // change the phase to TOLEADER so we look up and forward to our own replicas (if any)
- phase = DistribPhase.TOLEADER;
- }
-
- List<Node> replicas = null;
-
- if (zkEnabled && DistribPhase.TOLEADER == phase) {
- // This core should be a leader
- isLeader = true;
- replicas = setupRequestForDBQ();
- } else if (DistribPhase.FROMLEADER == phase) {
- isLeader = false;
- }
-
-
- // check if client has requested minimum replication factor information. will set replicationTracker to null if
- // we aren't the leader or subShardLeader
- checkReplicationTracker(cmd);
+ setupRequest(cmd);
+ doDeleteByQuery(cmd, null, null);
+ }
+ /**
+ * should be called by implementing class after setting up replicas
+ * @param cmd delete command
+ * @param replicas list of Nodes replicas to pass to {@link DistributedUpdateProcessor#doDistribDeleteByQuery(DeleteUpdateCommand, List, DocCollection)}
+ * @param coll the collection in zookeeper {@link org.apache.solr.common.cloud.DocCollection},
+ * passed to {@link DistributedUpdateProcessor#doDistribDeleteByQuery(DeleteUpdateCommand, List, DocCollection)}
+ */
+ protected void doDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas, DocCollection coll) throws IOException {
if (vinfo == null) {
super.processDelete(cmd);
return;
@@ -1654,78 +771,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
- boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
- boolean leaderLogic = isLeader && !isReplayOrPeersync;
versionDeleteByQuery(cmd);
- if (zkEnabled) {
- // forward to all replicas
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- boolean someReplicas = false;
- boolean subShardLeader = false;
- try {
- subShardLeader = amISubShardLeader(coll, null, null, null);
- if (subShardLeader) {
- String myShardId = cloudDesc.getShardId();
- Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
- collection, myShardId);
- // DBQ forwarded to NRT and TLOG replicas
- List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- if (replicaProps != null) {
- final List<Node> myReplicas = new ArrayList<>(replicaProps.size());
- for (ZkCoreNodeProps replicaProp : replicaProps) {
- myReplicas.add(new StdNode(replicaProp, collection, myShardId));
- }
- cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
- someReplicas = true;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
- }
- if (leaderLogic) {
- List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
- if (subShardLeaders != null) {
- cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
- }
- final List<Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
- if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
- params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- params.set(DISTRIB_FROM_COLLECTION, collection);
- params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
-
- cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker);
- }
- if (replicas != null) {
- cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
- someReplicas = true;
- }
- }
- if (someReplicas) {
- cmdDistrib.blockAndDoRetries();
- }
- }
+ doDistribDeleteByQuery(cmd, replicas, coll);
if (returnVersions && rsp != null) {
if (deleteByQueryResponse == null) {
deleteByQueryResponse = new NamedList<>(1);
- rsp.add("deleteByQuery",deleteByQueryResponse);
+ rsp.add("deleteByQuery", deleteByQueryResponse);
}
deleteByQueryResponse.add(cmd.getQuery(), cmd.getVersion());
}
}
+ /**
+ * This runs after versionDeleteByQuery is invoked, should be used to tamper or forward DeleteCommand
+ * @param cmd delete command
+ * @param replicas list of Nodes replicas
+ * @param coll the collection in zookeeper {@link org.apache.solr.common.cloud.DocCollection}.
+ * @throws IOException in case post processing failed
+ */
+ protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<Node> replicas, DocCollection coll) throws IOException {
+ // no-op for derived classes to implement
+ }
+
protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// Find the version
long versionOnUpdate = cmd.getVersion();
@@ -1780,46 +850,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
- // internal helper method to tell if we are the leader for an add or deleteById update
+ // internal helper method to setup request by processors who use this class.
// NOTE: not called by this class!
- boolean isLeader(UpdateCommand cmd) {
+ void setupRequest(UpdateCommand cmd) {
updateCommand = cmd;
-
- if (zkEnabled) {
- zkCheck();
- if (cmd instanceof AddUpdateCommand) {
- AddUpdateCommand acmd = (AddUpdateCommand)cmd;
- nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
- } else if (cmd instanceof DeleteUpdateCommand) {
- DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
- nodes = setupRequest(dcmd.getId(), null);
- }
- } else {
- isLeader = getNonZkLeaderAssumption(req);
- }
-
- return isLeader;
+ isLeader = getNonZkLeaderAssumption(req);
}
- private void zkCheck() {
-
- // Streaming updates can delay shutdown and cause big update reorderings (new streams can't be
- // initiated, but existing streams carry on). This is why we check if the CC is shutdown.
- // See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test.
- if (req.getCore().getCoreContainer().isShutDown()) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down.");
- }
-
- if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
- // for log reply or peer sync, we don't need to be connected to ZK
- return;
- }
-
- if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) {
- return;
- }
-
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
+ /**
+ *
+ * @param id id of doc
+ * @return url of leader, or null if not found.
+ */
+ protected String getLeaderUrl(String id) {
+ return req.getParams().get(DISTRIB_FROM);
}
protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
@@ -1946,101 +990,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void processCommit(CommitUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
-
- if (isReadOnly()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
- }
-
- updateCommand = cmd;
- List<Node> nodes = null;
- Replica leaderReplica = null;
- if (zkEnabled) {
- zkCheck();
- try {
- leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId());
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
- }
- isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
-
- nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
- if (nodes == null) {
- // This could happen if there are only pull replicas
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
- }
-
- nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName())
- && node.getNodeProps().getCoreName().equals(req.getCore().getName()));
- }
- if (!zkEnabled || (!isLeader && req.getParams().get(COMMIT_END_POINT, "").equals("replicas"))) {
- if (replicaType == Replica.Type.TLOG) {
-
- if (isLeader) {
- long commitVersion = vinfo.getNewClock();
- cmd.setVersion(commitVersion);
- doLocalCommit(cmd);
- }
-
- } else if (replicaType == Replica.Type.PULL) {
- log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
- } else {
- // NRT replicas will always commit
- if (vinfo != null) {
- long commitVersion = vinfo.getNewClock();
- cmd.setVersion(commitVersion);
- }
-
- doLocalCommit(cmd);
- }
- } else {
- ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
-
- List<Node> useNodes = null;
- if (req.getParams().get(COMMIT_END_POINT) == null) {
- useNodes = nodes;
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
- params.set(COMMIT_END_POINT, "leaders");
- if (useNodes != null) {
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
- cmdDistrib.distribCommit(cmd, useNodes, params);
- cmdDistrib.blockAndDoRetries();
- }
- }
-
- if (isLeader) {
- params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
-
- params.set(COMMIT_END_POINT, "replicas");
-
- useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
-
- if (useNodes != null) {
- params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
- zkController.getBaseUrl(), req.getCore().getName()));
-
- cmdDistrib.distribCommit(cmd, useNodes, params);
- }
- // NRT replicas will always commit
- if (vinfo != null) {
- long commitVersion = vinfo.getNewClock();
- cmd.setVersion(commitVersion);
- }
+ updateCommand = cmd;
- doLocalCommit(cmd);
- if (useNodes != null) {
- cmdDistrib.blockAndDoRetries();
- }
- }
- }
+ // replica type can only be NRT in standalone mode
+ // NRT replicas will always commit
+ doLocalCommit(cmd);
}
- private void doLocalCommit(CommitUpdateCommand cmd) throws IOException {
+ protected void doLocalCommit(CommitUpdateCommand cmd) throws IOException {
if (vinfo != null) {
+ long commitVersion = vinfo.getNewClock();
+ cmd.setVersion(commitVersion);
vinfo.lockForUpdate();
}
try {
@@ -2059,66 +1021,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
@Override
- public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
- if (isReadOnly()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
- }
- super.processMergeIndexes(cmd);
- }
+ public void finish() throws IOException {
+ assertNotFinished();
- @Override
- public void processRollback(RollbackUpdateCommand cmd) throws IOException {
- if (isReadOnly()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
- }
- super.processRollback(cmd);
+ super.finish();
}
- @Override
- public void finish() throws IOException {
+ protected void assertNotFinished() {
assert ! finished : "lifecycle sanity check";
finished = true;
-
- if (zkEnabled) doFinish();
-
- if (next != null && nodes == null) next.finish();
- }
-
- private List<Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
- ClusterState clusterState = zkController.getClusterState();
- final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
- if (collection == null || docCollection.getSlicesMap() == null) {
- throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
- "Could not find collection in zk: " + clusterState);
- }
- Map<String,Slice> slices = docCollection.getSlicesMap();
- final List<Node> urls = new ArrayList<>(slices.size());
- for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
- Slice replicas = slices.get(sliceEntry.getKey());
- if (onlyLeaders) {
- Replica replica = docCollection.getLeader(replicas.getName());
- if (replica != null) {
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
- urls.add(new StdNode(nodeProps, collection, replicas.getName()));
- }
- continue;
- }
- Map<String,Replica> shardMap = replicas.getReplicasMap();
-
- for (Entry<String,Replica> entry : shardMap.entrySet()) {
- if (!types.contains(entry.getValue().getType())) {
- continue;
- }
- ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
- if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
- urls.add(new StdNode(nodeProps, collection, replicas.getName()));
- }
- }
- }
- if (urls.isEmpty()) {
- return null;
- }
- return urls;
}
/**
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 4addae0..93c1bf2 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -49,9 +49,16 @@ public class DistributedUpdateProcessorFactory
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
+
+ final boolean isZkAware = req.getCore().getCoreContainer().isZooKeeperAware();
+
+ DistributedUpdateProcessor distribUpdateProcessor =
+ isZkAware ?
+ new DistributedZkUpdateProcessor(req, rsp, next) :
+ new DistributedUpdateProcessor(req, rsp, next);
// note: will sometimes return DURP (no overhead) instead of wrapping
return RoutedAliasUpdateProcessor.wrap(req,
- new DistributedUpdateProcessor(req, rsp, next));
+ distribUpdateProcessor);
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
new file mode 100644
index 0000000..abe4754
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -0,0 +1,1235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.RoutingRule;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.util.TestInjection;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
+
+ private final CloudDescriptor cloudDesc;
+ private final ZkController zkController;
+ private final SolrCmdDistributor cmdDistrib;
+ protected List<SolrCmdDistributor.Node> nodes;
+ private Set<String> skippedCoreNodeNames;
+ private final String collection;
+ private boolean readOnlyCollection = false;
+
+ // should we clone the document before sending it to replicas?
+ // this is set to true in the constructor if the next processors in the chain
+ // are custom and may modify the SolrInputDocument racing with its serialization for replication
+ private final boolean cloneRequiredOnLeader;
+
+ //used for keeping track of replicas that have processed an add/update from the leader
+ private RollupRequestReplicationTracker rollupReplicationTracker = null;
+ private LeaderRequestReplicationTracker leaderReplicationTracker = null;
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public DistributedZkUpdateProcessor(SolrQueryRequest req,
+ SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ super(req, rsp, next);
+ CoreContainer cc = req.getCore().getCoreContainer();
+ cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
+ zkController = cc.getZkController();
+ cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
+ cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
+ collection = cloudDesc.getCollectionName();
+ DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
+ if (coll != null) {
+ // check readOnly property in coll state
+ readOnlyCollection = coll.isReadOnly();
+ }
+ }
+
+ private boolean isReadOnly() {
+ return readOnlyCollection || req.getCore().readOnly;
+ }
+
+ private boolean isCloneRequiredOnLeader(UpdateRequestProcessor next) {
+ boolean shouldClone = false;
+ UpdateRequestProcessor nextInChain = next;
+ while (nextInChain != null) {
+ Class<? extends UpdateRequestProcessor> klass = nextInChain.getClass();
+ if (klass != LogUpdateProcessorFactory.LogUpdateProcessor.class
+ && klass != RunUpdateProcessor.class
+ && klass != TolerantUpdateProcessor.class) {
+ shouldClone = true;
+ break;
+ }
+ nextInChain = nextInChain.next;
+ }
+ return shouldClone;
+ }
+
+ @Override
+ protected Replica.Type computeReplicaType() {
+ // can't use cloudDesc since this is called by super class, before the constructor instantiates cloudDesc.
+ return req.getCore().getCoreDescriptor().getCloudDescriptor().getReplicaType();
+ }
+
+ @Override
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+
+ assert TestInjection.injectFailUpdateRequests();
+
+ if (isReadOnly()) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+ }
+
+ updateCommand = cmd;
+
+ List<SolrCmdDistributor.Node> nodes = null;
+ Replica leaderReplica = null;
+ zkCheck();
+ try {
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+ }
+ isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
+
+ nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
+ if (nodes == null) {
+ // This could happen if there are only pull replicas
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
+ }
+
+ nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName())
+ && node.getNodeProps().getCoreName().equals(req.getCore().getName()));
+
+ if (!isLeader && req.getParams().get(COMMIT_END_POINT, "").equals("replicas")) {
+ if (replicaType == Replica.Type.PULL) {
+ log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
+ } else if (replicaType == Replica.Type.NRT) {
+ doLocalCommit(cmd);
+ }
+ } else {
+ // zk
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+
+ List<SolrCmdDistributor.Node> useNodes = null;
+ if (req.getParams().get(COMMIT_END_POINT) == null) {
+ useNodes = nodes;
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ params.set(COMMIT_END_POINT, "leaders");
+ if (useNodes != null) {
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ cmdDistrib.distribCommit(cmd, useNodes, params);
+ cmdDistrib.blockAndDoRetries();
+ }
+ }
+
+ if (isLeader) {
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+
+ params.set(COMMIT_END_POINT, "replicas");
+
+ useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+
+ if (useNodes != null) {
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ cmdDistrib.distribCommit(cmd, useNodes, params);
+ }
+
+ doLocalCommit(cmd);
+
+ if (useNodes != null) {
+ cmdDistrib.blockAndDoRetries();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ assert TestInjection.injectFailUpdateRequests();
+
+ if (isReadOnly()) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+ }
+
+ setupRequest(cmd);
+
+ // check if client has requested minimum replication factor information. will set replicationTracker to null if
+ // we aren't the leader or subShardLeader
+ checkReplicationTracker(cmd);
+
+ super.processAdd(cmd);
+ }
+
+ @Override
+ protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
+
+ if (isLeader && !isSubShardLeader) {
+ DocCollection coll = zkController.getClusterState().getCollection(collection);
+ List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument());
+ // the list<node> will actually have only one element for an add request
+ if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
+ cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
+ }
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument());
+ if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM_COLLECTION, collection);
+ params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
+ cmdDistrib.distribAdd(cmd, nodesByRoutingRules, params, true);
+ }
+ }
+
+ if (nodes != null) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM,
+ (isLeader || isSubShardLeader ?
+ DistribPhase.FROMLEADER.toString() :
+ DistribPhase.TOLEADER.toString()));
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
+ // TODO: Kept for rolling upgrades only. Should be removed in Solr 9
+ params.set(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
+ }
+
+ if (cmd.isInPlaceUpdate()) {
+ params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
+
+ // Use synchronous=true so that a new connection is used, instead
+ // of the update being streamed through an existing streaming client.
+ // When using a streaming client, the previous update
+ // and the current in-place update (that depends on the previous update), if reordered
+ // in the stream, can result in the current update being bottled up behind the previous
+ // update in the stream and can lead to degraded performance.
+ cmdDistrib.distribAdd(cmd, nodes, params, true, rollupReplicationTracker, leaderReplicationTracker);
+ } else {
+ cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
+ }
+ }
+ }
+
+ @Override
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ if (isReadOnly()) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+ }
+
+ super.processDelete(cmd);
+ }
+
+ @Override
+ protected void doDeleteById(DeleteUpdateCommand cmd) throws IOException {
+ setupRequest(cmd);
+
+ // check if client has requested minimum replication factor information. will set replicationTracker to null if
+ // we aren't the leader or subShardLeader
+ checkReplicationTracker(cmd);
+
+ super.doDeleteById(cmd);
+ }
+
+ @Override
+ protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
+ if (isLeader && !isSubShardLeader) {
+ DocCollection coll = zkController.getClusterState().getCollection(collection);
+ List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
+ // the list<node> will actually have only one element for an add request
+ if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
+ }
+
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
+ if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM_COLLECTION, collection);
+ params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
+ cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, null, null);
+ }
+ }
+
+ if (nodes != null) {
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM,
+ (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString()
+ : DistribPhase.TOLEADER.toString()));
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
+ // TODO: Kept for rolling upgrades only. Remove in Solr 9
+ params.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
+ }
+ cmdDistrib.distribDelete(cmd, nodes, params, false, rollupReplicationTracker, leaderReplicationTracker);
+ }
+ }
+
+ @Override
+ protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+ zkCheck();
+
+ // NONE: we are the first to receive this deleteByQuery
+ // - it must be forwarded to the leader of every shard
+ // TO: we are a leader receiving a forwarded deleteByQuery... we must:
+ // - block all updates (use VersionInfo)
+ // - flush *all* updates going to our replicas
+ // - forward the DBQ to our replicas and wait for the response
+ // - log + execute the local DBQ
+ // FROM: we are a replica receiving a DBQ from our leader
+ // - log + execute the local DBQ
+ DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+ DocCollection coll = zkController.getClusterState().getCollection(collection);
+
+ if (DistribPhase.NONE == phase) {
+ if (rollupReplicationTracker == null) {
+ rollupReplicationTracker = new RollupRequestReplicationTracker();
+ }
+ boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
+
+ ModifiableSolrParams outParams = new ModifiableSolrParams(filterParams(req.getParams()));
+ outParams.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ outParams.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ SolrParams params = req.getParams();
+ String route = params.get(ShardParams._ROUTE_);
+ Collection<Slice> slices = coll.getRouter().getSearchSlices(route, params, coll);
+
+ List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
+ for (Slice slice : slices) {
+ String sliceName = slice.getName();
+ Replica leader;
+ try {
+ leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + sliceName, e);
+ }
+
+ // TODO: What if leaders changed in the meantime?
+ // should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
+
+ // Am I the leader for this slice?
+ ZkCoreNodeProps coreLeaderProps = new ZkCoreNodeProps(leader);
+ String leaderCoreNodeName = leader.getName();
+ String coreNodeName = cloudDesc.getCoreNodeName();
+ isLeader = coreNodeName.equals(leaderCoreNodeName);
+
+ if (isLeader) {
+ // don't forward to ourself
+ leaderForAnyShard = true;
+ } else {
+ leaders.add(new SolrCmdDistributor.ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
+ }
+ }
+
+ outParams.remove("commit"); // this will be distributed from the local commit
+
+
+ if (params.get(UpdateRequest.MIN_REPFACT) != null) {
+ // TODO: Kept this for rolling upgrades. Remove in Solr 9
+ outParams.add(UpdateRequest.MIN_REPFACT, req.getParams().get(UpdateRequest.MIN_REPFACT));
+ }
+ cmdDistrib.distribDelete(cmd, leaders, outParams, false, rollupReplicationTracker, null);
+
+ if (!leaderForAnyShard) {
+ return;
+ }
+
+ // change the phase to TOLEADER so we look up and forward to our own replicas (if any)
+ phase = DistribPhase.TOLEADER;
+ }
+ List<SolrCmdDistributor.Node> replicas = null;
+
+ if (DistribPhase.TOLEADER == phase) {
+ // This core should be a leader
+ isLeader = true;
+ replicas = setupRequestForDBQ();
+ } else if (DistribPhase.FROMLEADER == phase) {
+ isLeader = false;
+ }
+
+ // check if client has requested minimum replication factor information. will set replicationTracker to null if
+ // we aren't the leader or subShardLeader
+ checkReplicationTracker(cmd);
+ super.doDeleteByQuery(cmd, replicas, coll);
+ }
+
+ @Override
+ protected void doDistribDeleteByQuery(DeleteUpdateCommand cmd, List<SolrCmdDistributor.Node> replicas,
+ DocCollection coll) throws IOException {
+
+ boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
+ boolean leaderLogic = isLeader && !isReplayOrPeersync;
+
+ // forward to all replicas
+ ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(CommonParams.VERSION_FIELD, Long.toString(cmd.getVersion()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+
+ boolean someReplicas = false;
+ boolean subShardLeader = false;
+ try {
+ subShardLeader = amISubShardLeader(coll, null, null, null);
+ if (subShardLeader) {
+ String myShardId = cloudDesc.getShardId();
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+ collection, myShardId);
+ // DBQ forwarded to NRT and TLOG replicas
+ List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
+ .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ if (replicaProps != null) {
+ final List<SolrCmdDistributor.Node> myReplicas = new ArrayList<>(replicaProps.size());
+ for (ZkCoreNodeProps replicaProp : replicaProps) {
+ myReplicas.add(new SolrCmdDistributor.StdNode(replicaProp, collection, myShardId));
+ }
+ cmdDistrib.distribDelete(cmd, myReplicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
+ someReplicas = true;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ if (leaderLogic) {
+ List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
+ if (subShardLeaders != null) {
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
+ }
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
+ if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
+ params = new ModifiableSolrParams(filterParams(req.getParams()));
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+ params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
+ zkController.getBaseUrl(), req.getCore().getName()));
+ params.set(DISTRIB_FROM_COLLECTION, collection);
+ params.set(DISTRIB_FROM_SHARD, cloudDesc.getShardId());
+
+ cmdDistrib.distribDelete(cmd, nodesByRoutingRules, params, true, rollupReplicationTracker, leaderReplicationTracker);
+ }
+ if (replicas != null) {
+ cmdDistrib.distribDelete(cmd, replicas, params, false, rollupReplicationTracker, leaderReplicationTracker);
+ someReplicas = true;
+ }
+ }
+
+ if (someReplicas) {
+ cmdDistrib.blockAndDoRetries();
+ }
+ }
+
+ // used for deleteByQuery to get the list of nodes this leader should forward to
+ private List<SolrCmdDistributor.Node> setupRequestForDBQ() {
+ List<SolrCmdDistributor.Node> nodes = null;
+ String shardId = cloudDesc.getShardId();
+
+ try {
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
+
+ // TODO: what if we are no longer the leader?
+
+ forwardToLeader = false;
+ List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
+ .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ if (replicaProps != null) {
+ nodes = new ArrayList<>(replicaProps.size());
+ for (ZkCoreNodeProps props : replicaProps) {
+ nodes.add(new SolrCmdDistributor.StdNode(props, collection, shardId));
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+
+ return nodes;
+ }
+
+ @Override
+ protected String getLeaderUrl(String id) {
+ // try get leader from req params, fallback to zk lookup if not found.
+ String distribFrom = req.getParams().get(DISTRIB_FROM);
+ if(distribFrom != null) {
+ return distribFrom;
+ }
+ return getLeaderUrlZk(id);
+ }
+
+ private String getLeaderUrlZk(String id) {
+ // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is
+ // down or partitioned from us for some reason. Lets force refresh cluster state, and request the
+ // leader for the update.
+ if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader "
+ + "failed since we're not in cloud mode.");
+ }
+ try {
+ return zkController.getZkStateReader().getLeaderRetry(collection, cloudDesc.getShardId()).getCoreUrl();
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
+ }
+ }
+
+ @Override
+ void setupRequest(UpdateCommand cmd) {
+ updateCommand = cmd;
+ zkCheck();
+ if (cmd instanceof AddUpdateCommand) {
+ AddUpdateCommand acmd = (AddUpdateCommand)cmd;
+ nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
+ } else if (cmd instanceof DeleteUpdateCommand) {
+ DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
+ nodes = setupRequest(dcmd.getId(), null);
+ }
+ }
+
+ protected List<SolrCmdDistributor.Node> setupRequest(String id, SolrInputDocument doc) {
+ return setupRequest(id, doc, null);
+ }
+
+ protected List<SolrCmdDistributor.Node> setupRequest(String id, SolrInputDocument doc, String route) {
+ // if we are in zk mode...
+
+ assert TestInjection.injectUpdateRandomPause();
+
+ if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+ isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
+ forwardToLeader = false;
+ return null;
+ }
+
+ ClusterState cstate = zkController.getClusterState();
+ DocCollection coll = cstate.getCollection(collection);
+ Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
+
+ if (slice == null) {
+ // No slice found. Most strict routers will have already thrown an exception, so a null return is
+ // a signal to use the slice of this core.
+ // TODO: what if this core is not in the targeted collection?
+ String shardId = cloudDesc.getShardId();
+ slice = coll.getSlice(shardId);
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
+ }
+ }
+
+ DistribPhase phase =
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+ if (DistribPhase.FROMLEADER == phase && !couldIbeSubShardLeader(coll)) {
+ if (cloudDesc.isLeader()) {
+ // locally we think we are leader but the request says it came FROMLEADER
+ // that could indicate a problem, let the full logic below figure it out
+ } else {
+
+ assert TestInjection.injectFailReplicaRequests();
+
+ isLeader = false; // we actually might be the leader, but we don't want leader-logic for these types of updates anyway.
+ forwardToLeader = false;
+ return null;
+ }
+ }
+
+ String shardId = slice.getName();
+
+ try {
+ // Not equivalent to getLeaderProps, which retries to find a leader.
+ // Replica leader = slice.getLeader();
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
+
+ if (!isLeader) {
+ isSubShardLeader = amISubShardLeader(coll, slice, id, doc);
+ if (isSubShardLeader) {
+ shardId = cloudDesc.getShardId();
+ leaderReplica = zkController.getZkStateReader().getLeaderRetry(collection, shardId);
+ }
+ }
+
+ doDefensiveChecks(phase);
+
+ // if request is coming from another collection then we want it to be sent to all replicas
+ // even if its phase is FROMLEADER
+ String fromCollection = updateCommand.getReq().getParams().get(DISTRIB_FROM_COLLECTION);
+
+ if (DistribPhase.FROMLEADER == phase && !isSubShardLeader && fromCollection == null) {
+ // we are coming from the leader, just go local - add no urls
+ forwardToLeader = false;
+ return null;
+ } else if (isLeader || isSubShardLeader) {
+ // that means I want to forward onto my replicas...
+ // so get the replicas...
+ forwardToLeader = false;
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ String leaderCoreNodeName = leaderReplica.getName();
+ List<Replica> replicas = clusterState.getCollection(collection)
+ .getSlice(shardId)
+ .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
+ if (replicas.isEmpty()) {
+ return null;
+ }
+
+ // check for test param that lets us miss replicas
+ String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
+ Set<String> skipListSet = null;
+ if (skipList != null) {
+ skipListSet = new HashSet<>(skipList.length);
+ skipListSet.addAll(Arrays.asList(skipList));
+ log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
+ }
+
+ List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
+ skippedCoreNodeNames = new HashSet<>();
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ for (Replica replica: replicas) {
+ String coreNodeName = replica.getName();
+ if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
+ log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if(zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
+ skippedCoreNodeNames.add(replica.getName());
+ } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
+ skippedCoreNodeNames.add(replica.getName());
+ } else {
+ nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
+ }
+ }
+ return nodes;
+
+ } else {
+ // I need to forward on to the leader...
+ forwardToLeader = true;
+ return Collections.singletonList(
+ new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward));
+ }
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+
+ @Override
+ protected boolean shouldCloneCmdDoc() {
+ boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
+ return willDistrib & cloneRequiredOnLeader;
+ }
+
+ // helper method, processAdd was getting a bit large.
+ // Sets replicationTracker = null if we aren't the leader
+ // We have two possibilities here:
+ //
+ // 1> we are a leader: Allocate a LeaderTracker and, if we're getting the original request, a RollupTracker
+ // 2> we're a follower: allocat a RollupTracker
+ //
+ private void checkReplicationTracker(UpdateCommand cmd) {
+
+ SolrParams rp = cmd.getReq().getParams();
+ String distribUpdate = rp.get(DISTRIB_UPDATE_PARAM);
+ // Ok,we're receiving the original request, we need a rollup tracker, but only one so we accumulate over the
+ // course of a batch.
+ if ((distribUpdate == null || DistribPhase.NONE.toString().equals(distribUpdate)) &&
+ rollupReplicationTracker == null) {
+ rollupReplicationTracker = new RollupRequestReplicationTracker();
+ }
+ // If we're a leader, we need a leader replication tracker, so let's do that. If there are multiple docs in
+ // a batch we need to use the _same_ leader replication tracker.
+ if (isLeader && leaderReplicationTracker == null) {
+ leaderReplicationTracker = new LeaderRequestReplicationTracker(
+ req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
+ }
+ }
+
+
+ private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
+ ClusterState clusterState = zkController.getClusterState();
+ final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
+ if (collection == null || docCollection.getSlicesMap() == null) {
+ throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
+ "Could not find collection in zk: " + clusterState);
+ }
+ Map<String,Slice> slices = docCollection.getSlicesMap();
+ final List<SolrCmdDistributor.Node> urls = new ArrayList<>(slices.size());
+ for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
+ Slice replicas = slices.get(sliceEntry.getKey());
+ if (onlyLeaders) {
+ Replica replica = docCollection.getLeader(replicas.getName());
+ if (replica != null) {
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(replica);
+ urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName()));
+ }
+ continue;
+ }
+ Map<String,Replica> shardMap = replicas.getReplicasMap();
+
+ for (Map.Entry<String,Replica> entry : shardMap.entrySet()) {
+ if (!types.contains(entry.getValue().getType())) {
+ continue;
+ }
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
+ if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
+ urls.add(new SolrCmdDistributor.StdNode(nodeProps, collection, replicas.getName()));
+ }
+ }
+ }
+ if (urls.isEmpty()) {
+ return null;
+ }
+ return urls;
+ }
+
+ /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
+ private boolean couldIbeSubShardLeader(DocCollection coll) {
+ // Could I be the leader of a shard in "construction/recovery" state?
+ String myShardId = cloudDesc.getShardId();
+ Slice mySlice = coll.getSlice(myShardId);
+ Slice.State state = mySlice.getState();
+ return state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY;
+ }
+
+ /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
+ protected boolean amISubShardLeader(DocCollection coll, Slice parentSlice, String id, SolrInputDocument doc) throws InterruptedException {
+ // Am I the leader of a shard in "construction/recovery" state?
+ String myShardId = cloudDesc.getShardId();
+ Slice mySlice = coll.getSlice(myShardId);
+ final Slice.State state = mySlice.getState();
+ if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
+ Replica myLeader = zkController.getZkStateReader().getLeaderRetry(collection, myShardId);
+ boolean amILeader = myLeader.getName().equals(cloudDesc.getCoreNodeName());
+ if (amILeader) {
+ // Does the document belong to my hash range as well?
+ DocRouter.Range myRange = mySlice.getRange();
+ if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ if (parentSlice != null) {
+ boolean isSubset = parentSlice.getRange() != null && myRange.isSubsetOf(parentSlice.getRange());
+ return isSubset && coll.getRouter().isTargetSlice(id, doc, req.getParams(), myShardId, coll);
+ } else {
+ // delete by query case -- as long as I am a sub shard leader we're fine
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ String leaderCoreNodeName = leaderReplica.getName();
+ List<Replica> replicas = clusterState.getCollection(collection)
+ .getSlice(shardId)
+ .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
+ if (replicas.isEmpty()) {
+ return null;
+ }
+
+ // check for test param that lets us miss replicas
+ String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
+ Set<String> skipListSet = null;
+ if (skipList != null) {
+ skipListSet = new HashSet<>(skipList.length);
+ skipListSet.addAll(Arrays.asList(skipList));
+ log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
+ }
+
+ List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
+ skippedCoreNodeNames = new HashSet<>();
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ for (Replica replica : replicas) {
+ String coreNodeName = replica.getName();
+ if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
+ log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if (zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
+ skippedCoreNodeNames.add(replica.getName());
+ } else if (!clusterState.getLiveNodes().contains(replica.getNodeName())
+ || replica.getState() == Replica.State.DOWN) {
+ skippedCoreNodeNames.add(replica.getName());
+ } else {
+ nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(replica), collection, shardId));
+ }
+ }
+ return nodes;
+ }
+
+ /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#SPLITSHARD} */
+ protected List<SolrCmdDistributor.Node> getSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc) {
+ Collection<Slice> allSlices = coll.getSlices();
+ List<SolrCmdDistributor.Node> nodes = null;
+ for (Slice aslice : allSlices) {
+ final Slice.State state = aslice.getState();
+ if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
+ DocRouter.Range myRange = coll.getSlice(shardId).getRange();
+ if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
+ if (isSubset &&
+ (docId == null // in case of deletes
+ || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
+ Replica sliceLeader = aslice.getLeader();
+ // slice leader can be null because node/shard is created zk before leader election
+ if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
+ if (nodes == null) nodes = new ArrayList<>();
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
+ nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName()));
+ }
+ }
+ }
+ }
+ return nodes;
+ }
+
+ /** For {@link org.apache.solr.common.params.CollectionParams.CollectionAction#MIGRATE} */
+ protected List<SolrCmdDistributor.Node> getNodesByRoutingRules(ClusterState cstate, DocCollection coll, String id, SolrInputDocument doc) {
+ DocRouter router = coll.getRouter();
+ List<SolrCmdDistributor.Node> nodes = null;
+ if (router instanceof CompositeIdRouter) {
+ CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+ String myShardId = cloudDesc.getShardId();
+ Slice slice = coll.getSlice(myShardId);
+ Map<String, RoutingRule> routingRules = slice.getRoutingRules();
+ if (routingRules != null) {
+
+ // delete by query case
+ if (id == null) {
+ for (Map.Entry<String, RoutingRule> entry : routingRules.entrySet()) {
+ String targetCollectionName = entry.getValue().getTargetCollectionName();
+ final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName);
+ if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
+ final Slice[] activeSlices = docCollection.getActiveSlicesArr();
+ Slice any = activeSlices[0];
+ if (nodes == null) nodes = new ArrayList<>();
+ nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(any.getLeader())));
+ }
+ }
+ return nodes;
+ }
+
+ String routeKey = SolrIndexSplitter.getRouteKey(id);
+ if (routeKey != null) {
+ RoutingRule rule = routingRules.get(routeKey + "!");
+ if (rule != null) {
+ if (! rule.isExpired()) {
+ List<DocRouter.Range> ranges = rule.getRouteRanges();
+ if (ranges != null && !ranges.isEmpty()) {
+ int hash = compositeIdRouter.sliceHash(id, doc, null, coll);
+ for (DocRouter.Range range : ranges) {
+ if (range.includes(hash)) {
+ DocCollection targetColl = cstate.getCollection(rule.getTargetCollectionName());
+ Collection<Slice> activeSlices = targetColl.getRouter().getSearchSlicesSingle(id, null, targetColl);
+ if (activeSlices == null || activeSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "No active slices serving " + id + " found for target collection: " + rule.getTargetCollectionName());
+ }
+ Replica targetLeader = targetColl.getLeader(activeSlices.iterator().next().getName());
+ nodes = new ArrayList<>(1);
+ nodes.add(new SolrCmdDistributor.StdNode(new ZkCoreNodeProps(targetLeader)));
+ break;
+ }
+ }
+ }
+ } else {
+ ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock();
+ if (!ruleExpiryLock.isLocked()) {
+ try {
+ if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
+ log.info("Going to expire routing rule");
+ try {
+ Map<String, Object> map = Utils.makeMap(Overseer.QUEUE_OPERATION, OverseerAction.REMOVEROUTINGRULE.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.SHARD_ID_PROP, myShardId,
+ "routeKey", routeKey + "!");
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
+ } catch (KeeperException e) {
+ log.warn("Exception while removing routing rule for route key: " + routeKey, e);
+ } catch (Exception e) {
+ log.error("Exception while removing routing rule for route key: " + routeKey, e);
+ } finally {
+ ruleExpiryLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return nodes;
+ }
+
+ private void doDefensiveChecks(DistribPhase phase) {
+ boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
+ if (isReplayOrPeersync) return;
+
+ String from = req.getParams().get(DISTRIB_FROM);
+ ClusterState clusterState = zkController.getClusterState();
+
+ DocCollection docCollection = clusterState.getCollection(collection);
+ Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
+ boolean localIsLeader = cloudDesc.isLeader();
+ if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+ String fromShard = req.getParams().get(DISTRIB_FROM_PARENT);
+ if (fromShard != null) {
+ if (mySlice.getState() == Slice.State.ACTIVE) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Request says it is coming from parent shard leader but we are in active state");
+ }
+ // shard splitting case -- check ranges to see if we are a sub-shard
+ Slice fromSlice = docCollection.getSlice(fromShard);
+ DocRouter.Range parentRange = fromSlice.getRange();
+ if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Request says it is coming from parent shard leader but parent hash range is not superset of my range");
+ }
+ } else {
+ String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
+ if (fromCollection == null) {
+ log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
+ SolrException solrExc = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+ solrExc.setMetadata("cause", "LeaderChanged");
+ throw solrExc;
+ }
+ }
+ }
+
+ int count = 0;
+ while (((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) && count < 5) {
+ count++;
+ // re-getting localIsLeader since we published to ZK first before setting localIsLeader value
+ localIsLeader = cloudDesc.isLeader();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if ((isLeader && !localIsLeader) || (isSubShardLeader && !localIsLeader)) {
+ log.error("ClusterState says we are the leader, but locally we don't think so");
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "ClusterState says we are the leader (" + zkController.getBaseUrl()
+ + "/" + req.getCore().getName() + "), but locally we don't think so. Request came from " + from);
+ }
+ }
+
+ @Override
+ protected void doClose() {
+ if (cmdDistrib != null) {
+ cmdDistrib.close();
+ }
+ }
+
+ @Override
+ public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+ if (isReadOnly()) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+ }
+ super.processMergeIndexes(cmd);
+ }
+
+ @Override
+ public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+ if (isReadOnly()) {
+ throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
+ }
+ super.processRollback(cmd);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ assertNotFinished();
+
+ doFinish();
+ }
+
+ // TODO: optionally fail if n replicas are not reached...
+ private void doFinish() {
+ boolean shouldUpdateTerms = isLeader && isIndexChanged;
+ if (shouldUpdateTerms) {
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ if (skippedCoreNodeNames != null) {
+ zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames);
+ }
+ zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
+ }
+ // TODO: if not a forward and replication req is not specified, we could
+ // send in a background thread
+
+ cmdDistrib.finish();
+ List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
+ // TODO - we may need to tell about more than one error...
+
+ List<SolrCmdDistributor.Error> errorsForClient = new ArrayList<>(errors.size());
+ Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
+ for (final SolrCmdDistributor.Error error : errors) {
+
+ if (error.req.node instanceof SolrCmdDistributor.ForwardNode) {
+ // if it's a forward, any fail is a problem -
+ // otherwise we assume things are fine if we got it locally
+ // until we start allowing min replication param
+ errorsForClient.add(error);
+ continue;
+ }
+
+ // else...
+
+ // for now we don't error - we assume if it was added locally, we
+ // succeeded
+ if (log.isWarnEnabled()) {
+ log.warn("Error sending update to " + error.req.node.getBaseUrl(), error.e);
+ }
+
+ // Since it is not a forward request, for each fail, try to tell them to
+ // recover - the doc was already added locally, so it should have been
+ // legit
+
+ DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
+ if (phase != DistribPhase.FROMLEADER)
+ continue; // don't have non-leaders try to recovery other nodes
+
+ // commits are special -- they can run on any node irrespective of whether it is a leader or not
+ // we don't want to run recovery on a node which missed a commit command
+ if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null)
+ continue;
+
+ final String replicaUrl = error.req.node.getUrl();
+
+ // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
+ String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+ if ("LeaderChanged".equals(cause)) {
+ // let's just fail this request and let the client retry? or just call processAdd again?
+ log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
+ " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+ errorsForClient.add(error);
+ continue;
+ }
+
+ String collection = null;
+ String shardId = null;
+
+ if (error.req.node instanceof SolrCmdDistributor.StdNode) {
+ SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode)error.req.node;
+ collection = stdNode.getCollection();
+ shardId = stdNode.getShardId();
+
+ // before we go setting other replicas to down, make sure we're still the leader!
+ String leaderCoreNodeName = null;
+ Exception getLeaderExc = null;
+ Replica leaderProps = null;
+ try {
+ leaderProps = zkController.getZkStateReader().getLeader(collection, shardId);
+ if (leaderProps != null) {
+ leaderCoreNodeName = leaderProps.getName();
+ }
+ } catch (Exception exc) {
+ getLeaderExc = exc;
+ }
+ if (leaderCoreNodeName == null) {
+ log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " +
+ "before putting {} into leader-initiated recovery",
+ cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc);
+ }
+
+ List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
+ cloudDesc.getShardId(), cloudDesc.getCoreNodeName());
+ boolean foundErrorNodeInReplicaList = false;
+ if (myReplicas != null) {
+ for (ZkCoreNodeProps replicaProp : myReplicas) {
+ if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName())) {
+ foundErrorNodeInReplicaList = true;
+ break;
+ }
+ }
+ }
+
+ if (leaderCoreNodeName != null && cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) // we are still same leader
+ && foundErrorNodeInReplicaList // we found an error for one of replicas
+ && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
+ try {
+ String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
+ // if false, then the node is probably not "live" anymore
+ // and we do not need to send a recovery message
+ Throwable rootCause = SolrException.getRootCause(error.e);
+ log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+ replicasShouldBeInLowerTerms.add(coreNodeName);
+ } catch (Exception exc) {
+ Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
+ log.error("Leader failed to set replica " +
+ error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+ }
+ } else {
+ // not the leader anymore maybe or the error'd node is not my replica?
+ if (!foundErrorNodeInReplicaList) {
+ log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
+ cloudDesc.getShardId()+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
+ "No request recovery command will be sent!");
+ if (!shardId.equals(cloudDesc.getShardId())) {
+ // some replicas on other shard did not receive the updates (ex: during splitshard),
+ // exception must be notified to clients
+ errorsForClient.add(error);
+ }
+ } else {
+ log.warn("Core " + cloudDesc.getCoreNodeName() + " is no longer the leader for " + collection + " "
+ + shardId + " or we tried to put ourself into LIR, no request recovery command will be sent!");
+ }
+ }
+ }
+ }
+ if (!replicasShouldBeInLowerTerms.isEmpty()) {
+ zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+ .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
+ }
+ handleReplicationFactor();
+ if (0 < errorsForClient.size()) {
+ throw new DistributedUpdatesAsyncException(errorsForClient);
+ }
+ }
+
+ /**
+ * If necessary, include in the response the achieved replication factor
+ */
+ @SuppressWarnings("deprecation")
+ private void handleReplicationFactor() {
+ if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
+ int achievedRf = Integer.MAX_VALUE;
+
+ if (leaderReplicationTracker != null) {
+
+ achievedRf = leaderReplicationTracker.getAchievedRf();
+
+ // Transfer this to the rollup tracker if it exists
+ if (rollupReplicationTracker != null) {
+ rollupReplicationTracker.testAndSetAchievedRf(achievedRf);
+ }
+ }
+
+ // Rollup tracker has accumulated stats.
+ if (rollupReplicationTracker != null) {
+ achievedRf = rollupReplicationTracker.getAchievedRf();
+ }
+ if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
+ // Unused, but kept for back compatibility. To be removed in Solr 9
+ rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, Integer.parseInt(req.getParams().get(UpdateRequest.MIN_REPFACT)));
+ }
+ rsp.getResponseHeader().add(UpdateRequest.REPFACT, achievedRf);
+ rollupReplicationTracker = null;
+ leaderReplicationTracker = null;
+
+ }
+ }
+
+ private void zkCheck() {
+
+ // Streaming updates can delay shutdown and cause big update reorderings (new streams can't be
+ // initiated, but existing streams carry on). This is why we check if the CC is shutdown.
+ // See SOLR-8203 and loop HdfsChaosMonkeyNothingIsSafeTest (and check for inconsistent shards) to test.
+ if (req.getCore().getCoreContainer().isShutDown()) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is shutting down.");
+ }
+
+ if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+ // for log reply or peer sync, we don't need to be connected to ZK
+ return;
+ }
+
+ if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) {
+ return;
+ }
+
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java
index ef9f8de..125724b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java
@@ -368,7 +368,8 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
return true;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
- return !distribProc.isLeader(cmd);
+ distribProc.setupRequest(cmd);
+ return !distribProc.isLeader();
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
index ca31897..7fc33d7 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
@@ -216,7 +216,8 @@ public class SkipExistingDocumentsProcessorFactory extends UpdateRequestProcesso
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return false;
}
- return distribProc.isLeader(cmd);
+ distribProc.setupRequest(cmd);
+ return distribProc.isLeader();
}
@Override
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
index 7d377fa..2a20846 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/AtomicUpdateProcessorFactoryTest.java
@@ -232,7 +232,7 @@ public class AtomicUpdateProcessorFactoryTest extends SolrTestCaseJ4 {
try {
factory.getInstance(cmd.getReq(), new SolrQueryResponse(),
- new DistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
+ createDistributedUpdateProcessor(cmd.getReq(), new SolrQueryResponse(),
new RunUpdateProcessor(cmd.getReq(), null))).processAdd(cmd);
} catch (IOException e) {
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 8f56d68..a4c54d1 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -34,11 +34,10 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
}
@Test
- public void testShouldBufferUpdate() {
+ public void testShouldBufferUpdateZk() {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
req, null, null, null);
-
AddUpdateCommand cmd = new AddUpdateCommand(req);
// applying buffer updates, isReplayOrPeerSync flag doesn't matter
assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
@@ -50,5 +49,4 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
}
-
}
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 18b6ec1..7e1ee0a 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -131,6 +131,9 @@ import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.DirectSolrConnection;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedZkUpdateProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.RandomizeSSL;
import org.apache.solr.util.RandomizeSSL.SSLRandomizer;
@@ -2891,6 +2894,14 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
}
}
+
+ public static DistributedUpdateProcessor createDistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp,
+ UpdateRequestProcessor next) {
+ if(h.getCoreContainer().isZooKeeperAware()) {
+ return new DistributedZkUpdateProcessor(req, rsp, next);
+ }
+ return new DistributedUpdateProcessor(req, rsp, next);
+ }
/**
* Cleans up the randomized sysproperties and variables set by {@link #randomizeNumericTypesProperties}