You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2016/02/09 05:24:53 UTC
lucene-solr git commit: SOLR-7281: Add an overseer action to publish
an entire node as 'down'.
Repository: lucene-solr
Updated Branches:
refs/heads/master 1f66406cb -> e78002bdc
SOLR-7281: Add an overseer action to publish an entire node as 'down'.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e78002bd
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e78002bd
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e78002bd
Branch: refs/heads/master
Commit: e78002bdc165188e219171f81a7a38cda592b5b7
Parents: 1f66406
Author: markrmiller <ma...@apache.org>
Authored: Mon Feb 8 23:24:43 2016 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Mon Feb 8 23:24:43 2016 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/ElectionContext.java | 16 ++--
.../java/org/apache/solr/cloud/Overseer.java | 50 +++++++-----
.../org/apache/solr/cloud/ZkController.java | 50 +++++++-----
.../apache/solr/cloud/overseer/NodeMutator.java | 85 ++++++++++++++++++++
.../solr/cloud/overseer/OverseerAction.java | 3 +-
.../org/apache/solr/core/CoreContainer.java | 2 +-
7 files changed, 155 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ff4acd1..4767aa5 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -488,6 +488,8 @@ Optimizations
* SOLR-8615: Just like creating cores, we should use multiple threads when closing cores.
(Mark Miller)
+* SOLR-7281: Add an overseer action to publish an entire node as 'down'. (Mark Miller, shalin)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 30db6f1..10ac105 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -485,13 +485,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} else {
try (SolrCore core = cc.getCore(coreName)) {
- final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
- core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
- if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
- log.warn("The previous leader marked me " + core.getName()
- + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
-
- throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
+ if (core != null) {
+ final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
+ core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+ if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
+ log.warn("The previous leader marked me " + core.getName()
+ + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
+
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 53611c0..3fe2e5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -37,6 +38,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
+import org.apache.solr.cloud.overseer.NodeMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.ReplicaMutator;
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -270,10 +272,10 @@ public class Overseer implements Closeable {
private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
final String operation = message.getStr(QUEUE_OPERATION);
- ZkWriteCommand zkWriteCommand = null;
+ List<ZkWriteCommand> zkWriteCommands = null;
final TimerContext timerContext = stats.time(operation);
try {
- zkWriteCommand = processMessage(clusterState, message, operation);
+ zkWriteCommands = processMessage(clusterState, message, operation);
stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
@@ -286,8 +288,10 @@ public class Overseer implements Closeable {
} finally {
timerContext.stop();
}
- if (zkWriteCommand != null) {
- clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
+ if (zkWriteCommands != null) {
+ for (ZkWriteCommand zkWriteCommand : zkWriteCommands) {
+ clusterState = zkStateWriter.enqueueUpdate(clusterState, zkWriteCommand, callback);
+ }
if (!enableBatching) {
clusterState = zkStateWriter.writePendingUpdates();
}
@@ -334,37 +338,37 @@ public class Overseer implements Closeable {
}
}
- private ZkWriteCommand processMessage(ClusterState clusterState,
+ private List<ZkWriteCommand> processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) {
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
if (collectionAction != null) {
switch (collectionAction) {
case CREATE:
- return new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message);
+ return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).createCollection(clusterState, message));
case DELETE:
- return new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message);
+ return Collections.singletonList(new ClusterStateMutator(getZkStateReader()).deleteCollection(clusterState, message));
case CREATESHARD:
- return new CollectionMutator(getZkStateReader()).createShard(clusterState, message);
+ return Collections.singletonList(new CollectionMutator(getZkStateReader()).createShard(clusterState, message));
case DELETESHARD:
- return new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message);
+ return Collections.singletonList(new CollectionMutator(getZkStateReader()).deleteShard(clusterState, message));
case ADDREPLICA:
- return new SliceMutator(getZkStateReader()).addReplica(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).addReplica(clusterState, message));
case ADDREPLICAPROP:
- return new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message);
+ return Collections.singletonList(new ReplicaMutator(getZkStateReader()).addReplicaProperty(clusterState, message));
case DELETEREPLICAPROP:
- return new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message);
+ return Collections.singletonList(new ReplicaMutator(getZkStateReader()).deleteReplicaProperty(clusterState, message));
case BALANCESHARDUNIQUE:
ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(clusterState, message);
if (dProp.balanceProperty()) {
String collName = message.getStr(ZkStateReader.COLLECTION_PROP);
- return new ZkWriteCommand(collName, dProp.getDocCollection());
+ return Collections.singletonList(new ZkWriteCommand(collName, dProp.getDocCollection()));
}
break;
case MODIFYCOLLECTION:
CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
- return new CollectionMutator(reader).modifyCollection(clusterState,message);
+ return Collections.singletonList(new CollectionMutator(reader).modifyCollection(clusterState,message));
case MIGRATESTATEFORMAT:
- return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message);
+ return Collections.singletonList(new ClusterStateMutator(reader).migrateStateFormat(clusterState, message));
default:
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
@@ -376,17 +380,17 @@ public class Overseer implements Closeable {
}
switch (overseerAction) {
case STATE:
- return new ReplicaMutator(getZkStateReader()).setState(clusterState, message);
+ return Collections.singletonList(new ReplicaMutator(getZkStateReader()).setState(clusterState, message));
case LEADER:
- return new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).setShardLeader(clusterState, message));
case DELETECORE:
- return new SliceMutator(getZkStateReader()).removeReplica(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).removeReplica(clusterState, message));
case ADDROUTINGRULE:
- return new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).addRoutingRule(clusterState, message));
case REMOVEROUTINGRULE:
- return new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).removeRoutingRule(clusterState, message));
case UPDATESHARDSTATE:
- return new SliceMutator(getZkStateReader()).updateShardState(clusterState, message);
+ return Collections.singletonList(new SliceMutator(getZkStateReader()).updateShardState(clusterState, message));
case QUIT:
if (myId.equals(message.get("id"))) {
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
@@ -396,12 +400,14 @@ public class Overseer implements Closeable {
log.warn("Overseer received wrong QUIT message {}", message);
}
break;
+ case DOWNNODE:
+ return new NodeMutator(getZkStateReader()).downNode(clusterState, message);
default:
throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
}
}
- return ZkStateWriter.NO_OP;
+ return Collections.singletonList(ZkStateWriter.NO_OP);
}
private LeaderStatus amILeader() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 44af3f1..bfd9e76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -387,28 +387,7 @@ public final class ZkController {
if (descriptors != null) {
// before registering as live, make sure everyone is in a
// down state
- for (CoreDescriptor descriptor : descriptors) {
- try {
- descriptor.getCloudDescriptor().setLeader(false);
- publish(descriptor, Replica.State.DOWN, updateLastPublished);
- } catch (Exception e) {
- if (isClosed) {
- return;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- }
- try {
- publish(descriptor, Replica.State.DOWN);
- } catch (Exception e2) {
- SolrException.log(log, "", e2);
- continue;
- }
- }
- }
-
+ publishNodeAsDown(getNodeName());
for (CoreDescriptor descriptor : descriptors) {
// if it looks like we are going to be the leader, we don't
// want to wait for the following stuff
@@ -2504,4 +2483,31 @@ public final class ZkController {
}
return false;
}
+
+
+ /**
+ * Best effort to set DOWN state for all replicas on node.
+ *
+ * @param nodeName to operate on
+ */
+ public void publishNodeAsDown(String nodeName) {
+ log.info("Publish node={} as DOWN", nodeName);
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
+ ZkStateReader.NODE_NAME_PROP, nodeName);
+ try {
+ Overseer.getInQueue(getZkClient()).offer(Utils.toJSON(m));
+ } catch (KeeperException e) {
+ log.info("Could not publish node as down: " + e.getMessage());
+ } catch (RuntimeException e) {
+ Throwable rootCause = SolrException.getRootCause(e);
+ if (rootCause instanceof KeeperException) {
+ log.info("Could not publish node as down: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ log.info("", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
new file mode 100644
index 0000000..0784cd4
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.overseer;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeMutator {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public NodeMutator(ZkStateReader zkStateReader) {
+
+ }
+
+ public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
+ List<ZkWriteCommand> zkWriteCommands = new ArrayList<ZkWriteCommand>();
+ String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
+
+ log.info("DownNode state invoked for node: " + nodeName);
+
+ Set<String> collections = clusterState.getCollections();
+ for (String collection : collections) {
+
+ Map<String,Slice> slicesCopy = new LinkedHashMap<>(clusterState.getSlicesMap(collection));
+
+ Set<Entry<String,Slice>> entries = slicesCopy.entrySet();
+ for (Entry<String,Slice> entry : entries) {
+ Slice slice = clusterState.getSlice(collection, entry.getKey());
+ Map<String,Replica> newReplicas = new HashMap<String,Replica>();
+
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ Map<String,Object> props = replica.shallowCopy();
+ String rNodeName = replica.getNodeName();
+ if (rNodeName.equals(nodeName)) {
+ log.info("Update replica state for " + replica + " to " + Replica.State.DOWN.toString());
+ props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+ }
+
+ Replica newReplica = new Replica(replica.getName(), props);
+ newReplicas.put(replica.getName(), newReplica);
+ }
+
+ Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy());
+ slicesCopy.put(slice.getName(), newSlice);
+
+ }
+
+ zkWriteCommands.add(new ZkWriteCommand(collection, clusterState.getCollection(collection).copyWithSlices(slicesCopy)));
+ }
+
+ return zkWriteCommands;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
index ad766a3..ea00806 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/OverseerAction.java
@@ -33,7 +33,8 @@ public enum OverseerAction {
REMOVEROUTINGRULE,
UPDATESHARDSTATE,
STATE,
- QUIT;
+ QUIT,
+ DOWNNODE;
public static OverseerAction get(String p) {
if (p != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e78002bd/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a14ba86..426a493 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -554,7 +554,7 @@ public class CoreContainer {
if (isZooKeeperAware()) {
cancelCoreRecoveries();
- zkSys.publishCoresAsDown(solrCores.getCores());
+ zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName());
}
try {