You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by il...@apache.org on 2024/02/19 19:45:30 UTC

(solr) branch main updated: SOLR-16995: Configure replication behaviour for each replica type (#2207)

This is an automated email from the ASF dual-hosted git repository.

ilan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ec0cbff130 SOLR-16995: Configure replication behaviour for each replica type (#2207)
2ec0cbff130 is described below

commit 2ec0cbff1305c6dc5870664a6ba7e67a214737f9
Author: Vincent Primault <vi...@gmail.com>
AuthorDate: Mon Feb 19 20:45:24 2024 +0100

    SOLR-16995: Configure replication behaviour for each replica type (#2207)
---
 .../org/apache/solr/cloud/CloudDescriptor.java     |  4 ---
 .../org/apache/solr/cloud/RecoveryStrategy.java    | 24 ++++++++++--------
 .../org/apache/solr/cloud/ReplicateFromLeader.java | 29 +++-------------------
 .../solr/cloud/ShardLeaderElectionContext.java     |  4 ++-
 .../java/org/apache/solr/cloud/ZkController.java   | 24 ++++++++++--------
 .../java/org/apache/solr/core/CoreContainer.java   | 15 +++++------
 .../apache/solr/handler/ReplicationHandler.java    |  2 ++
 .../java/org/apache/solr/update/UpdateHandler.java |  7 ++++--
 .../java/org/apache/solr/common/cloud/Replica.java | 29 +++++++++++++++++++---
 9 files changed, 71 insertions(+), 67 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 31be35cc752..f26819aa021 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -70,10 +70,6 @@ public class CloudDescriptor {
     }
   }
 
-  public boolean requiresTransactionLog() {
-    return this.replicaType != Replica.Type.PULL;
-  }
-
   public Replica.State getLastPublished() {
     return lastPublished;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index f26672b1960..9ea837378d7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -336,14 +336,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
     // we can lose our core descriptor, so store it now
     this.coreDescriptor = core.getCoreDescriptor();
 
-    if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
+    if (this.coreDescriptor.getCloudDescriptor().getReplicaType().requireTransactionLog) {
       doSyncOrReplicateRecovery(core);
     } else {
       doReplicateOnlyRecovery(core);
     }
   }
 
-  private final void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
+  private void doReplicateOnlyRecovery(SolrCore core) {
     final RTimer timer = new RTimer();
     boolean successfulRecovery = false;
 
@@ -396,8 +396,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
         log.info("Starting Replication Recovery.");
 
         try {
-          log.info("Stopping background replicate from leader process");
-          zkController.stopReplicationFromLeader(coreName);
+          if (replicaType.replicateFromLeader) {
+            log.info("Stopping background replicate from leader process");
+            zkController.stopReplicationFromLeader(coreName);
+          }
           replicate(zkController.getNodeName(), core, leaderprops);
 
           if (isClosed()) {
@@ -417,8 +419,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
         log.error("Error while trying to recover. core={}", coreName, e);
       } finally {
         if (successfulRecovery) {
-          log.info("Restarting background replicate from leader process");
-          zkController.startReplicationFromLeader(coreName, false);
+          if (replicaType.replicateFromLeader) {
+            log.info("Restarting background replicate from leader process");
+            zkController.startReplicationFromLeader(coreName, false);
+          }
           log.info("Registering as Active after recovery.");
           try {
             zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -583,7 +587,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       }
     }
 
-    if (replicaType == Replica.Type.TLOG) {
+    if (replicaType.replicateFromLeader) {
       zkController.stopReplicationFromLeader(coreName);
     }
 
@@ -735,7 +739,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         if (successfulRecovery) {
           log.info("Registering as Active after recovery.");
           try {
-            if (replicaType == Replica.Type.TLOG) {
+            if (replicaType.replicateFromLeader) {
               zkController.startReplicationFromLeader(coreName, true);
             }
             zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -770,8 +774,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
    * @param ourUrl if the leader url is the same as our url, we will skip trying to connect
    * @return the leader replica, or null if closed
    */
-  private final Replica pingLeader(
-      String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+  private Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+      throws Exception {
     int numTried = 0;
     while (true) {
       CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 7abe46fe0c3..43390e63e9b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -20,7 +20,6 @@ package org.apache.solr.cloud;
 import java.lang.invoke.MethodHandles;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreContainer;
@@ -88,31 +87,9 @@ public class ReplicateFromLeader {
       log.info("Will start replication from leader with poll interval: {}", pollIntervalStr);
 
       NamedList<Object> followerConfig = new NamedList<>();
-      followerConfig.add("fetchFromLeader", Boolean.TRUE);
-
-      // don't commit on leader version zero for PULL replicas as PULL should only get its index
-      // state from leader
-      boolean skipCommitOnLeaderVersionZero = switchTransactionLog;
-      if (!skipCommitOnLeaderVersionZero) {
-        CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
-        if (cloudDescriptor != null) {
-          Replica replica =
-              cc.getZkController()
-                  .getZkStateReader()
-                  .getCollection(cloudDescriptor.getCollectionName())
-                  .getSlice(cloudDescriptor.getShardId())
-                  .getReplica(cloudDescriptor.getCoreNodeName());
-          if (replica != null && replica.getType() == Replica.Type.PULL) {
-            // only set this to true if we're a PULL replica, otherwise use value of
-            // switchTransactionLog
-            skipCommitOnLeaderVersionZero = true;
-          }
-        }
-      }
-      followerConfig.add(
-          ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, skipCommitOnLeaderVersionZero);
-
-      followerConfig.add("pollInterval", pollIntervalStr);
+      followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE);
+      followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, Boolean.TRUE);
+      followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr);
       NamedList<Object> replicationConfig = new NamedList<>();
       replicationConfig.add("follower", followerConfig);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index e1768e0cd37..116aee57d3f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -270,9 +270,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
 
       if (!isClosed) {
         try {
-          if (replicaType == Replica.Type.TLOG) {
+          if (replicaType.replicateFromLeader) {
             // stop replicate from old leader
             zkController.stopReplicationFromLeader(coreName);
+          }
+          if (replicaType == Replica.Type.TLOG) {
             if (weAreReplacement) {
               try (SolrCore core = cc.getCore(coreName)) {
                 Future<UpdateLog.RecoveryInfo> future =
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 51aa6e024c8..90442818718 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -78,7 +78,6 @@ import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Replica.Type;
 import org.apache.solr.common.cloud.SecurityAwareZkACLProvider;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -1307,16 +1306,19 @@ public class ZkController implements Closeable {
         boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
         if (replica.getType().leaderEligible) {
           joinElection(desc, afterExpiration, joinAtHead);
-        } else if (replica.getType() == Type.PULL) {
+        } else {
           if (joinAtHead) {
             log.warn(
-                "Replica {} was designated as preferred leader but it's type is {}, It won't join election",
+                "Replica {} was designated as preferred leader but its type is {}, It won't join election",
                 coreZkNodeName,
-                Type.PULL);
+                replica.getType());
+          }
+          if (log.isDebugEnabled()) {
+            log.debug(
+                "Replica {} skipping election because its type is {}",
+                coreZkNodeName,
+                replica.getType());
           }
-          log.debug(
-              "Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL);
-          startReplicationFromLeader(coreName, false);
         }
       } catch (InterruptedException e) {
         // Restore the interrupted status
@@ -1391,8 +1393,8 @@ public class ZkController implements Closeable {
                 cc,
                 afterExpiration);
         if (!didRecovery) {
-          if (isTlogReplicaAndNotLeader) {
-            startReplicationFromLeader(coreName, true);
+          if (replica.getType().replicateFromLeader && !isLeader) {
+            startReplicationFromLeader(coreName, replica.getType().requireTransactionLog);
           }
           publish(desc, Replica.State.ACTIVE);
         }
@@ -2400,7 +2402,7 @@ public class ZkController implements Closeable {
 
       try (SolrCore core = cc.getCore(coreName)) {
         Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
-        if (replicaType == Type.TLOG) {
+        if (replicaType.replicateFromLeader) {
           String leaderUrl =
               getLeader(
                   core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait());
@@ -2408,7 +2410,7 @@ public class ZkController implements Closeable {
             // restart the replication thread to ensure the replication is running in each new
             // replica especially if previous role is "leader" (i.e., no replication thread)
             stopReplicationFromLeader(coreName);
-            startReplicationFromLeader(coreName, true);
+            startReplicationFromLeader(coreName, replicaType.requireTransactionLog);
           }
         }
       }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8af5ecc272e..3d66853a781 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -2051,15 +2051,13 @@ public class CoreContainer {
         if (docCollection != null) {
           Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
           assert replica != null : cd.getCloudDescriptor().getCoreNodeName() + " had no replica";
-          if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here?
+          if (replica.getType().replicateFromLeader) {
             getZkController().stopReplicationFromLeader(core.getName());
             if (!cd.getCloudDescriptor().isLeader()) {
-              getZkController().startReplicationFromLeader(newCore.getName(), true);
+              getZkController()
+                  .startReplicationFromLeader(
+                      newCore.getName(), replica.getType().requireTransactionLog);
             }
-
-          } else if (replica.getType() == Replica.Type.PULL) {
-            getZkController().stopReplicationFromLeader(core.getName());
-            getZkController().startReplicationFromLeader(newCore.getName(), false);
           }
         }
         success = true;
@@ -2167,9 +2165,8 @@ public class CoreContainer {
     if (zkSys.getZkController() != null) {
       // cancel recovery in cloud mode
       core.getSolrCoreState().cancelRecovery();
-      if (cd.getCloudDescriptor().getReplicaType() == Replica.Type.PULL
-          || cd.getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
-        // Stop replication if this is part of a pull/tlog replica before closing the core
+      if (cd.getCloudDescriptor().getReplicaType().replicateFromLeader) {
+        // Stop replication before closing the core
         zkSys.getZkController().stopReplicationFromLeader(name);
       }
     }
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index a5451dc1b2b..02d6745b234 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -1801,6 +1801,8 @@ public class ReplicationHandler extends RequestHandlerBase
   // in case of TLOG replica, if leaderVersion = zero, don't do commit
   // otherwise updates from current tlog won't copied over properly to the new tlog, leading to data
   // loss
+  // don't commit on leader version zero for PULL replicas as PULL should only get its index
+  // state from leader
   public static final String SKIP_COMMIT_ON_LEADER_VERSION_ZERO = "skipCommitOnLeaderVersionZero";
 
   /**
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index cfcd32fb4cf..f0ade81e090 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -116,10 +116,13 @@ public abstract class UpdateHandler implements SolrInfoBean {
     parseEventListeners();
     PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
 
-    // If this is a replica of type PULL, don't create the update log
+    // If this replica doesn't require a transaction log, don't create it
     boolean skipUpdateLog =
         core.getCoreDescriptor().getCloudDescriptor() != null
-            && !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog();
+            && !core.getCoreDescriptor()
+                .getCloudDescriptor()
+                .getReplicaType()
+                .requireTransactionLog;
     if (updateLog == null
         && ulogPluginInfo != null
         && ulogPluginInfo.isEnabled()
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index abeef4a9fc6..0d4cd3afffb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -104,27 +104,48 @@ public class Replica extends ZkNodeProps implements MapWriter {
      * support NRT (soft commits) and RTG. Any {@link Type#NRT} replica can become a leader. A shard
      * leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
      */
-    NRT(true, CollectionAdminParams.NRT_REPLICAS),
+    NRT(true, true, false, CollectionAdminParams.NRT_REPLICAS),
     /**
      * Writes to transaction log, but not to index, uses replication. Any {@link Type#TLOG} replica
      * can become leader (by first applying all local transaction log elements). If a replica is of
      * type {@link Type#TLOG} but is also the leader, it will behave as a {@link Type#NRT}. A shard
      * leader will forward updates to all active {@link Type#NRT} and {@link Type#TLOG} replicas.
      */
-    TLOG(true, CollectionAdminParams.TLOG_REPLICAS),
+    TLOG(true, true, true, CollectionAdminParams.TLOG_REPLICAS),
     /**
      * Doesn’t index or writes to transaction log. Just replicates from {@link Type#NRT} or {@link
      * Type#TLOG} replicas. {@link Type#PULL} replicas can’t become shard leaders (i.e., if there
      * are only pull replicas in the collection at some point, updates will fail same as if there is
      * no leaders, queries continue to work), so they don’t even participate in elections.
      */
-    PULL(false, CollectionAdminParams.PULL_REPLICAS);
+    PULL(false, false, true, CollectionAdminParams.PULL_REPLICAS);
 
+    /** Whether replicas of this type join the leader election and can be elected. */
     public final boolean leaderEligible;
+
+    /**
+     * Whether replicas of this type require a transaction log. A transaction log will be created
+     * only if this is {@code true}.
+     */
+    public final boolean requireTransactionLog;
+
+    /**
+     * Whether replicas of this type continuously replicate from the leader, if they are not
+     * themselves the leader.
+     */
+    public final boolean replicateFromLeader;
+
+    /** Name of the property in messages that contains the number of replicas of this type. */
     public final String numReplicasPropertyName;
 
-    Type(boolean leaderEligible, String numReplicasPropertyName) {
+    Type(
+        boolean leaderEligible,
+        boolean requireTransactionLog,
+        boolean replicateFromLeader,
+        String numReplicasPropertyName) {
       this.leaderEligible = leaderEligible;
+      this.requireTransactionLog = requireTransactionLog;
+      this.replicateFromLeader = replicateFromLeader;
       this.numReplicasPropertyName = numReplicasPropertyName;
     }