You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by il...@apache.org on 2024/02/19 19:45:30 UTC
(solr) branch main updated: SOLR-16995: Configure replication behaviour for each replica type (#2207)
This is an automated email from the ASF dual-hosted git repository.
ilan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 2ec0cbff130 SOLR-16995: Configure replication behaviour for each replica type (#2207)
2ec0cbff130 is described below
commit 2ec0cbff1305c6dc5870664a6ba7e67a214737f9
Author: Vincent Primault <vi...@gmail.com>
AuthorDate: Mon Feb 19 20:45:24 2024 +0100
SOLR-16995: Configure replication behaviour for each replica type (#2207)
---
.../org/apache/solr/cloud/CloudDescriptor.java | 4 ---
.../org/apache/solr/cloud/RecoveryStrategy.java | 24 ++++++++++--------
.../org/apache/solr/cloud/ReplicateFromLeader.java | 29 +++-------------------
.../solr/cloud/ShardLeaderElectionContext.java | 4 ++-
.../java/org/apache/solr/cloud/ZkController.java | 24 ++++++++++--------
.../java/org/apache/solr/core/CoreContainer.java | 15 +++++------
.../apache/solr/handler/ReplicationHandler.java | 2 ++
.../java/org/apache/solr/update/UpdateHandler.java | 7 ++++--
.../java/org/apache/solr/common/cloud/Replica.java | 29 +++++++++++++++++++---
9 files changed, 71 insertions(+), 67 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 31be35cc752..f26819aa021 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -70,10 +70,6 @@ public class CloudDescriptor {
}
}
- public boolean requiresTransactionLog() {
- return this.replicaType != Replica.Type.PULL;
- }
-
public Replica.State getLastPublished() {
return lastPublished;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index f26672b1960..9ea837378d7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -336,14 +336,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
// we can lose our core descriptor, so store it now
this.coreDescriptor = core.getCoreDescriptor();
- if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
+ if (this.coreDescriptor.getCloudDescriptor().getReplicaType().requireTransactionLog) {
doSyncOrReplicateRecovery(core);
} else {
doReplicateOnlyRecovery(core);
}
}
- private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
+ private void doReplicateOnlyRecovery(SolrCore core) {
final RTimer timer = new RTimer();
boolean successfulRecovery = false;
@@ -396,8 +396,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Starting Replication Recovery.");
try {
- log.info("Stopping background replicate from leader process");
- zkController.stopReplicationFromLeader(coreName);
+ if (replicaType.replicateFromLeader) {
+ log.info("Stopping background replicate from leader process");
+ zkController.stopReplicationFromLeader(coreName);
+ }
replicate(zkController.getNodeName(), core, leaderprops);
if (isClosed()) {
@@ -417,8 +419,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.error("Error while trying to recover. core={}", coreName, e);
} finally {
if (successfulRecovery) {
- log.info("Restarting background replicate from leader process");
- zkController.startReplicationFromLeader(coreName, false);
+ if (replicaType.replicateFromLeader) {
+ log.info("Restarting background replicate from leader process");
+ zkController.startReplicationFromLeader(coreName, false);
+ }
log.info("Registering as Active after recovery.");
try {
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -583,7 +587,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- if (replicaType == Replica.Type.TLOG) {
+ if (replicaType.replicateFromLeader) {
zkController.stopReplicationFromLeader(coreName);
}
@@ -735,7 +739,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (successfulRecovery) {
log.info("Registering as Active after recovery.");
try {
- if (replicaType == Replica.Type.TLOG) {
+ if (replicaType.replicateFromLeader) {
zkController.startReplicationFromLeader(coreName, true);
}
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -770,8 +774,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
* @param ourUrl if the leader url is the same as our url, we will skip trying to connect
* @return the leader replica, or null if closed
*/
- private final Replica pingLeader(
- String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+ private Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+ throws Exception {
int numTried = 0;
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 7abe46fe0c3..43390e63e9b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -20,7 +20,6 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
@@ -88,31 +87,9 @@ public class ReplicateFromLeader {
log.info("Will start replication from leader with poll interval: {}", pollIntervalStr);
NamedList<Object> followerConfig = new NamedList<>();
- followerConfig.add("fetchFromLeader", Boolean.TRUE);
-
- // don't commit on leader version zero for PULL replicas as PULL should only get its index
- // state from leader
- boolean skipCommitOnLeaderVersionZero = switchTransactionLog;
- if (!skipCommitOnLeaderVersionZero) {
- CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
- if (cloudDescriptor != null) {
- Replica replica =
- cc.getZkController()
- .getZkStateReader()
- .getCollection(cloudDescriptor.getCollectionName())
- .getSlice(cloudDescriptor.getShardId())
- .getReplica(cloudDescriptor.getCoreNodeName());
- if (replica != null && replica.getType() == Replica.Type.PULL) {
- // only set this to true if we're a PULL replica, otherwise use value of
- // switchTransactionLog
- skipCommitOnLeaderVersionZero = true;
- }
- }
- }
- followerConfig.add(
- ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, skipCommitOnLeaderVersionZero);
-
- followerConfig.add("pollInterval", pollIntervalStr);
+ followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE);
+ followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, Boolean.TRUE);
+ followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("follower", followerConfig);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index e1768e0cd37..116aee57d3f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -270,9 +270,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (!isClosed) {
try {
- if (replicaType == Replica.Type.TLOG) {
+ if (replicaType.replicateFromLeader) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
+ }
+ if (replicaType == Replica.Type.TLOG) {
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future =
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 51aa6e024c8..90442818718 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -78,7 +78,6 @@ import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Replica.Type;
import org.apache.solr.common.cloud.SecurityAwareZkACLProvider;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -1307,16 +1306,19 @@ public class ZkController implements Closeable {
boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (replica.getType().leaderEligible) {
joinElection(desc, afterExpiration, joinAtHead);
- } else if (replica.getType() == Type.PULL) {
+ } else {
if (joinAtHead) {
log.warn(
- "Replica {} was designated as preferred leader but it's type is {}, It won't join election",
+ "Replica {} was designated as preferred leader but its type is {}, It won't join election",
coreZkNodeName,
- Type.PULL);
+ replica.getType());
+ }
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Replica {} skipping election because its type is {}",
+ coreZkNodeName,
+ replica.getType());
}
- log.debug(
- "Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL);
- startReplicationFromLeader(coreName, false);
}
} catch (InterruptedException e) {
// Restore the interrupted status
@@ -1391,8 +1393,8 @@ public class ZkController implements Closeable {
cc,
afterExpiration);
if (!didRecovery) {
- if (isTlogReplicaAndNotLeader) {
- startReplicationFromLeader(coreName, true);
+ if (replica.getType().replicateFromLeader && !isLeader) {
+ startReplicationFromLeader(coreName, replica.getType().requireTransactionLog);
}
publish(desc, Replica.State.ACTIVE);
}
@@ -2400,7 +2402,7 @@ public class ZkController implements Closeable {
try (SolrCore core = cc.getCore(coreName)) {
Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
- if (replicaType == Type.TLOG) {
+ if (replicaType.replicateFromLeader) {
String leaderUrl =
getLeader(
core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait());
@@ -2408,7 +2410,7 @@ public class ZkController implements Closeable {
// restart the replication thread to ensure the replication is running in each new
// replica especially if previous role is "leader" (i.e., no replication thread)
stopReplicationFromLeader(coreName);
- startReplicationFromLeader(coreName, true);
+ startReplicationFromLeader(coreName, replicaType.requireTransactionLog);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8af5ecc272e..3d66853a781 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -2051,15 +2051,13 @@ public class CoreContainer {
if (docCollection != null) {
Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
assert replica != null : cd.getCloudDescriptor().getCoreNodeName() + " had no replica";
- if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here?
+ if (replica.getType().replicateFromLeader) {
getZkController().stopReplicationFromLeader(core.getName());
if (!cd.getCloudDescriptor().isLeader()) {
- getZkController().startReplicationFromLeader(newCore.getName(), true);
+ getZkController()
+ .startReplicationFromLeader(
+ newCore.getName(), replica.getType().requireTransactionLog);
}
-
- } else if (replica.getType() == Replica.Type.PULL) {
- getZkController().stopReplicationFromLeader(core.getName());
- getZkController().startReplicationFromLeader(newCore.getName(), false);
}
}
success = true;
@@ -2167,9 +2165,8 @@ public class CoreContainer {
if (zkSys.getZkController() != null) {
// cancel recovery in cloud mode
core.getSolrCoreState().cancelRecovery();
- if (cd.getCloudDescriptor().getReplicaType() == Replica.Type.PULL
- || cd.getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
- // Stop replication if this is part of a pull/tlog replica before closing the core
+ if (cd.getCloudDescriptor().getReplicaType().replicateFromLeader) {
+ // Stop replication before closing the core
zkSys.getZkController().stopReplicationFromLeader(name);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index a5451dc1b2b..02d6745b234 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -1801,6 +1801,8 @@ public class ReplicationHandler extends RequestHandlerBase
// in case of TLOG replica, if leaderVersion = zero, don't do commit
// otherwise updates from current tlog won't copied over properly to the new tlog, leading to data
// loss
+ // don't commit on leader version zero for PULL replicas as PULL should only get its index
+ // state from leader
public static final String SKIP_COMMIT_ON_LEADER_VERSION_ZERO = "skipCommitOnLeaderVersionZero";
/**
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index cfcd32fb4cf..f0ade81e090 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -116,10 +116,13 @@ public abstract class UpdateHandler implements SolrInfoBean {
parseEventListeners();
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
- // If this is a replica of type PULL, don't create the update log
+ // If this replica doesn't require a transaction log, don't create it
boolean skipUpdateLog =
core.getCoreDescriptor().getCloudDescriptor() != null
- && !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog();
+ && !core.getCoreDescriptor()
+ .getCloudDescriptor()
+ .getReplicaType()
+ .requireTransactionLog;
if (updateLog == null
&& ulogPluginInfo != null
&& ulogPluginInfo.isEnabled()
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index abeef4a9fc6..0d4cd3afffb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -104,27 +104,48 @@ public class Replica extends ZkNodeProps implements MapWriter {
* support NRT (soft commits) and RTG. Any {@link Type#NRT} replica can become a leader. A shard
* leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
*/
- NRT(true, CollectionAdminParams.NRT_REPLICAS),
+ NRT(true, true, false, CollectionAdminParams.NRT_REPLICAS),
/**
* Writes to transaction log, but not to index, uses replication. Any {@link Type#TLOG} replica
* can become leader (by first applying all local transaction log elements). If a replica is of
* type {@link Type#TLOG} but is also the leader, it will behave as a {@link Type#NRT}. A shard
* leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
*/
- TLOG(true, CollectionAdminParams.TLOG_REPLICAS),
+ TLOG(true, true, true, CollectionAdminParams.TLOG_REPLICAS),
/**
* Doesn’t index or writes to transaction log. Just replicates from {@link Type#NRT} or {@link
* Type#TLOG} replicas. {@link Type#PULL} replicas can’t become shard leaders (i.e., if there
* are only pull replicas in the collection at some point, updates will fail same as if there is
* no leaders, queries continue to work), so they don’t even participate in elections.
*/
- PULL(false, CollectionAdminParams.PULL_REPLICAS);
+ PULL(false, false, true, CollectionAdminParams.PULL_REPLICAS);
+ /** Whether replicas of this type join the leader election and can be elected. */
public final boolean leaderEligible;
+
+ /**
+ * Whether replicas of this type require a transaction log. A transaction log will be created
+ * only if this is {@code true}.
+ */
+ public final boolean requireTransactionLog;
+
+ /**
+ * Whether replicas of this type continuously replicate from the leader, if they are not
+ * themselves the leader.
+ */
+ public final boolean replicateFromLeader;
+
+ /** Name of the property in messages that contains the number of replicas of this type. */
public final String numReplicasPropertyName;
- Type(boolean leaderEligible, String numReplicasPropertyName) {
+ Type(
+ boolean leaderEligible,
+ boolean requireTransactionLog,
+ boolean replicateFromLeader,
+ String numReplicasPropertyName) {
this.leaderEligible = leaderEligible;
+ this.requireTransactionLog = requireTransactionLog;
+ this.replicateFromLeader = replicateFromLeader;
this.numReplicasPropertyName = numReplicasPropertyName;
}