You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/02/09 05:24:53 UTC

lucene-solr git commit: SOLR-7281: Add an overseer action to publish an entire node as 'down'.

Repository: lucene-solr
Updated Branches:
  refs/heads/master 1f66406cb -> e78002bdc


SOLR-7281: Add an overseer action to publish an entire node as 'down'.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e78002bd
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e78002bd
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e78002bd

Branch: refs/heads/master
Commit: e78002bdc165188e219171f81a7a38cda592b5b7
Parents: 1f66406
Author: markrmiller <ma...@apache.org>
Authored: Mon Feb 8 23:24:43 2016 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Mon Feb 8 23:24:43 2016 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../org/apache/solr/cloud/ElectionContext.java  | 16 ++--
 .../java/org/apache/solr/cloud/Overseer.java    | 50 +++++++-----
 .../org/apache/solr/cloud/ZkController.java     | 50 +++++++-----
 .../apache/solr/cloud/overseer/NodeMutator.java | 85 ++++++++++++++++++++
 .../solr/cloud/overseer/OverseerAction.java     |  3 +-
 .../org/apache/solr/core/CoreContainer.java     |  2 +-
 7 files changed, 155 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ff4acd1..4767aa5 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -488,6 +488,8 @@ Optimizations
 * SOLR-8615: Just like creating cores, we should use multiple threads when closing cores.
   (Mark Miller)
 
+* SOLR-7281: Add an overseer action to publish an entire node as 'down'. (Mark Miller, shalin)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 30db6f1..10ac105 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -485,13 +485,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       
     } else {
       try (SolrCore core = cc.getCore(coreName)) {
-        final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
-            core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
-        if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
-          log.warn("The previous leader marked me " + core.getName()
-              + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
-          
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
+        if (core != null) {
+          final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
+              core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+          if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
+            log.warn("The previous leader marked me " + core.getName()
+                + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
+
+            throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 53611c0..3fe2e5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -37,6 +38,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.CollectionMutator;
+import org.apache.solr.cloud.overseer.NodeMutator;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.overseer.ReplicaMutator;
 import org.apache.solr.cloud.overseer.SliceMutator;
@@ -270,10 +272,10 @@ public class Overseer implements Closeable {
 
     private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
       final String operation = message.getStr(QUEUE_OPERATION);
-      ZkWriteCommand zkWriteCommand = null;
+      List<ZkWriteCommand> zkWriteCommands = null;
       final TimerContext timerContext = stats.time(operation);
       try {
-        zkWriteCommand = processMessage(clusterState, message, operation);
+        zkWriteCommands = processMessage(clusterState, message, operation);
         stats.success(operation);
       } catch (Exception e) {
         // generally there is nothing we can do - in most cases, we have
@@ -286,8 +288,10 @@ public class Overseer implements Closeable {
       } finally {
         timerContext.stop();
       }
-      if (zkWriteCommand != null) {
-        clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
+      if (zkWriteCommands != null) {
+        for (ZkWriteCommand zkWriteCommand : zkWriteCommands) {
+          clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
+        }
         if (!enableBatching)  {
           clusterState = zkStateWriter.writePendingUpdates();
         }
@@ -334,37 +338,37 @@ public class Overseer implements Closeable {
       }
     }
 
-    private ZkWriteCommand processMessage(ClusterState clusterState,
+    private List<ZkWriteCommand> processMessage(ClusterState clusterState,
         final ZkNodeProps message, final String operation) {
       CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
       if (collectionAction != null) {
         switch (collectionAction) {
           case CREATE:
-            return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
+            return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message));
           case DELETE:
-            return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
+            return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message));
           case CREATESHARD:
-            return new CollectionMutator(getZkStateReader()).createShard(clusterState, message);
+            return Collections.singletonList(new CollectionMutator(getZkStateReader()).createShard(clusterState, message));
           case DELETESHARD:
-            return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
+            return Collections.singletonList(new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message));
           case ADDREPLICA:
-            return new SliceMutator(getZkStateReader()).addReplica(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).addReplica(clusterState, message));
           case ADDREPLICAPROP:
-            return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message);
+            return Collections.singletonList(new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message));
           case DELETEREPLICAPROP:
-            return new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message);
+            return Collections.singletonList(new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message));
           case BALANCESHARDUNIQUE:
             ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
             if (dProp.balanceProperty()) {
               String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
-              return new ZkWriteCommand(collName, dProp.getDocCollection());
+              return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection()));
             }
             break;
           case MODIFYCOLLECTION:
             CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
-            return new CollectionMutator(reader).modifyCollection(clusterState,message);
+            return Collections.singletonList(new CollectionMutator(reader).modifyCollection(clusterState,message));
           case MIGRATESTATEFORMAT:
-            return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message);
+            return Collections.singletonList(new ClusterStateMutator(reader).migrateStateFormat(clusterState, message));
           default:
             throw new RuntimeException("unknown operation:" + operation
                 + " contents:" + message.getProperties());
@@ -376,17 +380,17 @@ public class Overseer implements Closeable {
         }
         switch (overseerAction) {
           case STATE:
-            return new ReplicaMutator(getZkStateReader()).setState(clusterState, message);
+            return Collections.singletonList(new ReplicaMutator(getZkStateReader()).setState(clusterState, message));
           case LEADER:
-            return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message));
           case DELETECORE:
-            return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).removeReplica(clusterState, message));
           case ADDROUTINGRULE:
-            return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message));
           case REMOVEROUTINGRULE:
-            return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message));
           case UPDATESHARDSTATE:
-            return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message);
+            return Collections.singletonList(new SliceMutator(getZkStateReader()).updateShardState(clusterState, message));
           case QUIT:
             if (myId.equals(message.get("id"))) {
               log.info("Quit command received {}", LeaderElector.getNodeName(myId));
@@ -396,12 +400,14 @@ public class Overseer implements Closeable {
               log.warn("Overseer received wrong QUIT message {}", message);
             }
             break;
+          case DOWNNODE:
+            return new NodeMutator(getZkStateReader()).downNode(clusterState, message);
           default:
             throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
         }
       }
 
-      return ZkStateWriter.NO_OP;
+      return Collections.singletonList(ZkStateWriter.NO_OP);
     }
 
     private LeaderStatus amILeader() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 44af3f1..bfd9e76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -387,28 +387,7 @@ public final class ZkController {
     if (descriptors != null) {
       // before registering as live, make sure everyone is in a
       // down state
-      for (CoreDescriptor descriptor : descriptors) {
-        try {
-          descriptor.getCloudDescriptor().setLeader(false);
-          publish(descriptor, Replica.State.DOWN, updateLastPublished);
-        } catch (Exception e) {
-          if (isClosed) {
-            return;
-          }
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e1) {
-            Thread.currentThread().interrupt();
-          }
-          try {
-            publish(descriptor, Replica.State.DOWN);
-          } catch (Exception e2) {
-            SolrException.log(log, "", e2);
-            continue;
-          }
-        }
-      }
-
+      publishNodeAsDown(getNodeName()); 
       for (CoreDescriptor descriptor : descriptors) {
         // if it looks like we are going to be the leader, we don't
         // want to wait for the following stuff
@@ -2504,4 +2483,31 @@ public final class ZkController {
     }
     return false;
   }
+  
+  
+  /**
+   * Best effort to set DOWN state for all replicas on node.
+   * 
+   * @param nodeName to operate on
+   */
+  public void publishNodeAsDown(String nodeName) {
+    log.info("Publish node={} as DOWN", nodeName);
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
+        ZkStateReader.NODE_NAME_PROP, nodeName);
+    try {
+      Overseer.getInQueue(getZkClient()).offer(Utils.toJSON(m));
+    } catch (KeeperException e) {
+      log.info("Could not publish node as down: " + e.getMessage());
+    } catch (RuntimeException e) {
+      Throwable rootCause = SolrException.getRootCause(e);
+      if (rootCause instanceof KeeperException) {
+        log.info("Could not publish node as down: " + e.getMessage());
+      } else {
+        throw e;
+      }
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      log.info("", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
new file mode 100644
index 0000000..0784cd4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.overseer;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeMutator {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public NodeMutator(ZkStateReader zkStateReader) {
+
+  }
+
+  public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
+    List<ZkWriteCommand> zkWriteCommands = new ArrayList<ZkWriteCommand>();
+    String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
+
+    log.info("DownNode state invoked for node: " + nodeName);
+
+    Set<String> collections = clusterState.getCollections();
+    for (String collection : collections) {
+
+      Map<String,Slice> slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection));
+
+      Set<Entry<String,Slice>> entries = slicesCopy.entrySet();
+      for (Entry<String,Slice> entry : entries) {
+        Slice slice = clusterState.getSlice(collection, entry.getKey());
+        Map<String,Replica> newReplicas = new HashMap<String,Replica>();
+
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          Map<String,Object> props = replica.shallowCopy();
+          String rNodeName = replica.getNodeName();
+          if (rNodeName.equals(nodeName)) {
+            log.info("Update replica state for " + replica + " to " + Replica.State.DOWN.toString());
+            props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+          }
+
+          Replica newReplica = new Replica(replica.getName(), props);
+          newReplicas.put(replica.getName(), newReplica);
+        }
+
+        Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy());
+        slicesCopy.put(slice.getName(), newSlice);
+
+      }
+
+      zkWriteCommands.add(new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy)));
+    }
+
+    return zkWriteCommands;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
index ad766a3..ea00806 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
@@ -33,7 +33,8 @@ public enum OverseerAction {
   REMOVEROUTINGRULE,
   UPDATESHARDSTATE,
   STATE,
-  QUIT;
+  QUIT,
+  DOWNNODE;
 
   public static OverseerAction get(String p) {
     if (p != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a14ba86..426a493 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -554,7 +554,7 @@ public class CoreContainer {
 
     if (isZooKeeperAware()) {
       cancelCoreRecoveries();
-      zkSys.publishCoresAsDown(solrCores.getCores());
+      zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName()); 
     }
 
     try {