You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 03:42:44 UTC

[lucene-solr] branch reference_impl_dev updated: @1158 Simplify state update writing.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new ec23836  @1158 Simplify state update writing.
ec23836 is described below

commit ec23836f25c4eddd8fcae10b8f975a404a4dcb79
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Nov 10 21:40:05 2020 -0600

    @1158 Simplify state update writing.
---
 .../solr/cloud/OverseerTaskExecutorTask.java       |   4 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  60 +-----
 .../solr/cloud/api/collections/MoveReplicaCmd.java |   2 +-
 .../OverseerCollectionMessageHandler.java          |   4 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 213 +++++++++++----------
 .../apache/solr/common/cloud/ZkStateReader.java    |   4 +-
 7 files changed, 124 insertions(+), 165 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
index 8e95c17..2277034 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskExecutorTask.java
@@ -73,10 +73,10 @@ public class OverseerTaskExecutorTask implements Runnable {
     ClusterState cs = zkStateWriter.getClusterstate(true);
 
     log.info("Process message {} {}", message, operation);
-    ClusterState newClusterState = processMessage(message, operation, cs);
+ //   ClusterState newClusterState = processMessage(message, operation, cs);
 
     log.info("Enqueue message {}", operation);
-    zkStateWriter.enqueueUpdate(newClusterState, true);
+    zkStateWriter.enqueueUpdate(null, message, true);
 
 
     if (log.isDebugEnabled()) log.debug("State update consumed from queue {}", message);
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d6ceeb3..1f88d0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -625,6 +625,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
 
     if (replicaType == Replica.Type.TLOG) {
+      log.info("Stopping replication from leader for {}", coreName);
       zkController.stopReplicationFromLeader(coreName);
     }
 
@@ -635,16 +636,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
                                                                                             // though
       try {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
-        final Replica leader = getLeader(ourUrl, this.coreDescriptor, true);
-        if (isClosed()) {
-          log.info("RecoveryStrategy has been closed");
-          break;
-        }
-
-        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
-        if (isLeader && !cloudDesc.isLeader() && leader.getState().equals(Replica.State.ACTIVE)) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
-        }
+        final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 15000);
 
         log.info("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
@@ -703,6 +695,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           if (syncSuccess) {
             SolrQueryRequest req = new LocalSolrQueryRequest(core,
                 new ModifiableSolrParams());
+            log.info("PeerSync was successful, commit to force open a new searcher");
             // force open a new searcher
             core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
             req.close();
@@ -734,23 +727,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
           log.info("Replication Recovery was successful.");
           successfulRecovery = true;
         } catch (InterruptedException | AlreadyClosedException e) {
-          ParWork.propagateInterrupt(e, true);
+          log.info("Interrupted or already closed, bailing on recovery");
           return;
-        } catch (NullPointerException e) {
-          if (log.isDebugEnabled()) log.debug("NullPointerException", e);
-          break;
         } catch (Exception e) {
           SolrException.log(log, "Error while trying to recover", e);
         }
 
       } catch (Exception e) {
-        if (core.getCoreContainer().isShutDown()) {
-          break;
-        }
         SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
       } finally {
         if (successfulRecovery) {
-          log.info("Registering as Active after recovery.");
+          log.info("Registering as Active after recovery {}", coreName);
           try {
             if (replicaType == Replica.Type.TLOG) {
               zkController.startReplicationFromLeader(coreName, true);
@@ -768,6 +755,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
             close = true;
             recoveryListener.recovered();
           }
+        } else {
+          log.info("Recovery was not sucessful, will not register as ACTIVE {}", coreName);
         }
       }
 
@@ -829,39 +818,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
     log.info("Finished recovery process, successful=[{}]", successfulRecovery);
   }
 
-  private final Replica getLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
-      throws Exception {
-    int numTried = 0;
-    while (true) {
-      CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
-      DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
-      if (!isClosed() && mayPutReplicaAsDown && numTried == 1 && docCollection.getReplica(coreDesc.getName()).getState() == Replica.State.ACTIVE) {
-        // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
-        // zkController.publish(coreDesc, Replica.State.DOWN);
-        // TODO: We should be in recovery and ignored by queries?
-      }
-      numTried++;
-
-      if (numTried > 3) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get leader");
-        // instead of hammering on the leader,
-        // let recovery process continue normally
-      }
-
-      Replica leaderReplica = null;
-
-      try {
-        leaderReplica = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
-      } catch (SolrException e) {
-        Thread.sleep(500);
-        log.info("Could not find leader, looping again ...", e);
-        continue;
-      }
-
-      return leaderReplica;
-    }
-  }
-
   public static Runnable testing_beforeReplayBufferingUpdates;
 
     final private void replay(SolrCore core)
@@ -926,7 +882,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   final public boolean isClosed() {
-    return close || cc.isShutDown();
+    return close;
   }
 
   final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
index 20bcfd9..691e267 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -290,7 +290,7 @@ public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     AddReplicaCmd.Response response = ocmh.addReplicaWithResp(clusterState, addReplicasProps, addResult);
 
-    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState, false);
+    ocmh.overseer.getZkStateWriter().enqueueUpdate(response.clusterState, null,false);
     ocmh.overseer.writePendingUpdates();
 
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 27a6b9f..b901efc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -271,7 +271,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         if (log.isDebugEnabled()) log.debug("Command returned clusterstate={} results={}", responce.clusterState, results);
 
         if (responce.clusterState != null) {
-          overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, false);
+          overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, null, false);
 
           overseer.writePendingUpdates();
         }
@@ -281,7 +281,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           AddReplicaCmd.Response resp = responce.asyncFinalRunner.call();
           if (log.isDebugEnabled()) log.debug("Finalize after Command returned clusterstate={}", resp.clusterState);
           if (resp.clusterState != null) {
-            overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, false);
+            overseer.getZkStateWriter().enqueueUpdate(responce.clusterState, null,false);
             overseer.writePendingUpdates();
           }
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index a2b9b22..0634699 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -348,7 +348,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 //        firstReplicaFutures.add(future);
       }
 
-      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState, false);
+      ocmh.overseer.getZkStateWriter().enqueueUpdate(clusterState, null,false);
       ocmh.overseer.writePendingUpdates();
       firstReplicaFutures.forEach(future -> {
         try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index eeaecaf..da787b9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.overseer;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
@@ -37,9 +39,11 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.BoundedTreeSet;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -81,16 +85,17 @@ public class ZkStateWriter {
     cs = zkStateReader.getClusterState();
   }
 
-  public void enqueueUpdate(ClusterState clusterState, boolean stateUpdate) throws Exception {
+  public void enqueueUpdate(ClusterState clusterState, ZkNodeProps message, boolean stateUpdate) throws Exception {
 
     if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={}", stateUpdate);
     ourLock.lock();
     try {
       AtomicBoolean changed = new AtomicBoolean();
-      if (clusterState == null) {
-        throw new NullPointerException("clusterState cannot be null");
-      }
+
       if (!stateUpdate) {
+        if (clusterState == null) {
+          throw new NullPointerException("clusterState cannot be null");
+        }
         changed.set(true);
         clusterState.forEachCollection(collection -> {
           DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
@@ -100,7 +105,12 @@ public class ZkStateWriter {
               Slice currentSlice = currentCollection.getSlice(slice.getName());
               if (currentSlice != null) {
                 slice.setState(currentSlice.getState());
+                Replica leader = currentSlice.getLeader();
                 slice.setLeader(currentSlice.getLeader());
+                if (leader != null) {
+                  leader.setState(Replica.State.ACTIVE);
+                  leader.getProperties().put("leader", "true");
+                }
               }
             }
 
@@ -126,112 +136,97 @@ public class ZkStateWriter {
 
         this.cs = clusterState;
       } else {
-        clusterState.forEachCollection(newCollection -> {
-
-          DocCollection currentCollection = cs.getCollectionOrNull(newCollection.getName());
-          if (currentCollection == null) {
-            log.error("Could not update state for non existing collection {}", newCollection.getName());
-            return;
-          }
-          for (Slice slice : newCollection) {
-            Slice currentSlice = currentCollection.getSlice(slice.getName());
-            if (currentSlice != null) {
-              if (log.isDebugEnabled()) log.debug("set slice state to {} {} leader={}", slice.getName(), slice.getState(), slice.getLeader());
-              Replica leader = slice.getLeader();
-              if (leader != null) {
-                currentSlice.setState(slice.getState());
-                currentSlice.setLeader(slice.getLeader());
-                currentSlice.getLeader().getProperties().put("leader", "true");
-                currentSlice.getLeader().getProperties().put("state", Replica.State.ACTIVE.toString());
-                changed.set(true);
+        final String operation = message.getStr(Overseer.QUEUE_OPERATION);
+        OverseerAction overseerAction = OverseerAction.get(operation);
+        if (overseerAction == null) {
+          throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
+        }
+        switch (overseerAction) {
+          case STATE:
+            log.info("state cmd");
+            String collection = message.getStr("collection");
+            DocCollection docColl = cs.getCollectionOrNull(collection);
+            if (docColl != null) {
+              Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
+              if (replica != null) {
+                Replica.State state = Replica.State.getState((String) message.get(ZkStateReader.STATE_PROP));
+                log.info("set state {} {}", state, replica);
+                if (state != replica.getState()) {
+                  replica.setState(state);
+                  changed.set(true);
+                  collectionsToWrite.add(collection);
+                }
               }
             }
-            for (Replica replica : slice) {
-              Replica currentReplica = currentCollection.getReplica(replica.getName());
-              if (currentReplica != null) {
-                if (log.isDebugEnabled()) log.debug("set replica state to {} isLeader={}", replica.getState(), replica.getProperty("leader"));
-                currentReplica.setState(replica.getState());
-                String leader = replica.getProperty("leader");
-                if (leader != null || slice.getLeader() != null && replica.getName().equals(slice.getLeader().getName())) {
-                  currentReplica.getProperties().put("leader", "true");
-                  currentReplica.getProperties().put("state", Replica.State.ACTIVE.toString());
+            break;
+          case LEADER:
+            log.info("leader cmd");
+            collection = message.getStr("collection");
+            docColl = cs.getCollectionOrNull(collection);
+            if (docColl != null) {
+              Slice slice = docColl.getSlice(message.getStr("shard"));
+              if (slice != null) {
+                Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
+                if (replica != null) {
+                  log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
+                  slice.setLeader(replica);
+                  replica.setState(Replica.State.ACTIVE);
+                  replica.getProperties().put("leader", "true");
+                  Collection<Replica> replicas = slice.getReplicas();
+                  for (Replica r : replicas) {
+                    if (r != replica) {
+                      r.getProperties().remove("leader");
+                    }
+                  }
+                  changed.set(true);
+                  collectionsToWrite.add(collection);
                 }
-                // nocommit
-                //              else if (leader == null) {
-                //                currentReplica.getProperties().remove("leader");
-                //              }
-
-                if (slice.getLeader() != null && slice.getLeader().getName().equals(replica.getName())) {
-                  currentReplica.getProperties().put("leader", "true");
-                  currentReplica.getProperties().put("state", Replica.State.ACTIVE.toString());
+              }
+            }
+            break;
+//          case ADDROUTINGRULE:
+//            return new SliceMutator(cloudManager).addRoutingRule(clusterState, message);
+//          case REMOVEROUTINGRULE:
+//            return new SliceMutator(cloudManager).removeRoutingRule(clusterState, message);
+          case UPDATESHARDSTATE:
+            collection = message.getStr("collection");
+            message.getProperties().remove("collection");
+            message.getProperties().remove("operation");
+
+              docColl = cs.getCollectionOrNull(collection);
+              if (docColl != null) {
+                for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+                  Slice slice = docColl.getSlice(entry.getKey());
+                  if (slice != null) {
+                    Slice.State state = Slice.State.getState((String) entry.getValue());
+                    if (slice.getState() != state) {
+                      slice.setState(state);
+                      changed.set(true);
+                      collectionsToWrite.add(collection);
+                    }
+                  }
                 }
-
-                Replica thereplica = cs.getCollectionOrNull(newCollection.getName()).getReplica(replica.getName());
-                if (log.isDebugEnabled()) log.debug("Check states nreplica={} ceplica={}", replica.getState(), thereplica.getState());
-
-                if (replica.getState() == Replica.State.ACTIVE) {
-                  if (log.isDebugEnabled()) log.debug("Setting replica to active state leader={} state={} col={}", leader, cs, currentCollection);
+              }
+            break;
+          case DOWNNODE:
+            collection = message.getStr("collection");
+            docColl = cs.getCollectionOrNull(collection);
+            if (docColl != null) {
+              List<Replica> replicas = docColl.getReplicas();
+              for (Replica replica : replicas) {
+                if (replica.getState() != Replica.State.DOWN) {
+                  replica.setState(Replica.State.DOWN);
+                  changed.set(true);
+                  collectionsToWrite.add(collection);
                 }
-
-                changed.set(true); // nocommit - only if really changed
               }
-
             }
-          }
-        });
+            break;
+          default:
+            throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
 
-        cs.forEachCollection(collection -> {
-          Object removed = collection.getProperties().remove("replicationFactor");
-          if (removed != null) {
-            changed.set(true); // nocommit - only if really changed
-          }
-          removed = collection.getProperties().remove("pullReplicas");
-          if (removed != null) {
-            changed.set(true); // nocommit - only if really changed
-          }
-          removed = collection.getProperties().remove("maxShardsPerNode");
-          if (removed != null) {
-            changed.set(true); // nocommit - only if really changed
-          }
-          removed = collection.getProperties().remove("nrtReplicas");
-          if (removed != null) {
-            changed.set(true); // nocommit - only if really changed
-          }
-          removed = collection.getProperties().remove("tlogReplicas");
-          if (removed != null) {
-            changed.set(true); // nocommit - only if really changed
-          }
-
-          for (Slice slice : collection) {
-            Replica leader = slice.getLeader();
-            if (leader != null && leader.getState() != Replica.State.ACTIVE) {
-              slice.setLeader(null);
-              leader.getProperties().remove("leader");
-              changed.set(true);
-            }
-
-            for (Replica replica : slice) {
-              String isLeader = replica.getProperty("leader");
-              if (log.isDebugEnabled()) log.debug("isleader={} slice={} state={} sliceLeader={}", isLeader, slice.getName(), slice.getState(), slice.getLeader());
-              if (Boolean.parseBoolean(isLeader) && replica.getState() != Replica.State.ACTIVE) {
-                if (log.isDebugEnabled()) log.debug("clear leader isleader={} slice={} state={} sliceLeader={}", isLeader, slice.getName(), slice.getState(), slice.getLeader());
-                replica.getProperties().remove("leader");
-                changed.set(true); // nocommit - only if really changed
-              }
-
-              removed = replica.getProperties().remove("numShards");
-              if (removed != null) {
-                changed.set(true); // nocommit - only if really changed
-              }
-              removed = replica.getProperties().remove("base_url");
-              if (removed != null) {
-                changed.set(true); // nocommit - only if really changed
-              }
+        }
 
-            }
-          }
-        });
-        collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
       }
 
       if (stateUpdate) {
@@ -338,12 +333,20 @@ public class ZkStateWriter {
                 Integer version;
                 if (v != null) {
                   version = v;
+                  lastVersion.set(version);
+                  reader.getZkClient().setData(path, data, version, true);
                 } else {
-                  version = reader.getZkClient().exists(path, null).getVersion();
+                  Stat existsStat = reader.getZkClient().exists(path, null);
+                  if (existsStat == null) {
+                    version = 0;
+                    lastVersion.set(version);
+                    reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
+                  } else {
+                    version = stat.getVersion();
+                    lastVersion.set(version);
+                    reader.getZkClient().setData(path, data, version, true);
+                  }
                 }
-                lastVersion.set(version);
-                reader.getZkClient().setData(path, data, version, true);
-
                 trackVersions.put(collection.getName(), version + 1);
               } catch (KeeperException.NoNodeException e) {
                 if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 2faa060..8435f0a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -222,7 +222,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     final Set<T> stateWatchers = ConcurrentHashMap.newKeySet();
 
     public boolean canBeRemoved() {
-      return coreRefCount.get() <=0 && stateWatchers.size() <= 0;
+      return coreRefCount.get() < 0 && stateWatchers.size() <= 0;
     }
 
   }
@@ -1916,7 +1916,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
   private boolean updateWatchedCollection(String coll, DocCollection newState) {
 
     if (newState == null) {
-      log.debug("Removing cached collection state for [{}]", coll);
+      if (log.isDebugEnabled()) log.debug("Removing cached collection state for [{}]", coll);
       watchedCollectionStates.remove(coll);
       return true;
     }