You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/29 03:22:55 UTC
svn commit: r1237194 - in
/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud:
ElectionContext.java Overseer.java
Author: markrmiller
Date: Sun Jan 29 02:22:55 2012
New Revision: 1237194
URL: http://svn.apache.org/viewvc?rev=1237194&view=rev
Log:
SOLR-3065: Let overseer process cluster state changes asynchronously
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1237194&r1=1237193&r2=1237194&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Jan 29 02:22:55 2012
@@ -209,14 +209,31 @@ final class OverseerElectionContext exte
private final ZkStateReader stateReader;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
- super(zkNodeName, "/overseer_elect", null, null);
+ super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null);
this.zkClient = zkClient;
this.stateReader = stateReader;
}
@Override
void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement) throws KeeperException, InterruptedException {
- new Overseer(zkClient, stateReader);
+
+ final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+ ZkNodeProps myProps = new ZkNodeProps("id", id);
+
+ try {
+ zkClient.makePath(leaderPath,
+ ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+ } catch (NodeExistsException e) {
+ // if a previous leader ephemeral still exists for some reason, try and
+ // remove it
+ zkClient.delete(leaderPath, -1, true);
+ zkClient.makePath(leaderPath,
+ ZkStateReader.toJSON(myProps),
+ CreateMode.EPHEMERAL, true);
+ }
+
+ new Overseer(zkClient, stateReader, id);
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1237194&r1=1237193&r2=1237194&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun Jan 29 02:22:55 2012
@@ -22,9 +22,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.Set;
import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
@@ -51,13 +53,32 @@ import org.slf4j.LoggerFactory;
* Cluster leader. Responsible node assignments, cluster state file?
*/
public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+
+ private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates
+
+ static enum Op {
+ LeaderChange, StateChange;
+ }
+
+ private final class CloudStateUpdateRequest {
+
+ final Op operation;
+ final Object[] args;
+
+ CloudStateUpdateRequest(final Op operation, final Object... args) {
+ this.operation = operation;
+ this.args = args;
+ }
+ }
public static final String ASSIGNMENTS_NODE = "/node_assignments";
public static final String STATES_NODE = "/node_states";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private final SolrZkClient zkClient;
- private final ZkStateReader reader;
+
+ // pooled updates
+ private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
// node stateWatches
private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
@@ -66,12 +87,222 @@ public class Overseer implements NodeSta
private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
private ZkCmdExecutor zkCmdExecutor;
- public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
- log.info("Constructing new Overseer");
+ private static class CloudStateUpdater implements Runnable {
+
+ private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
+ private final ZkStateReader reader;
+ private final SolrZkClient zkClient;
+ private final String myId;
+
+ public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
+ this.fifo = fifo;
+ this.myId = myId;
+ this.reader = reader;
+ this.zkClient = zkClient;
+ }
+ @Override
+ public void run() {
+ while (amILeader()) {
+
+
+ LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
+ while (!fifo.isEmpty()) {
+ // collect all queued requests
+ CloudStateUpdateRequest req;
+ req = fifo.poll();
+ if (req == null) {
+ break;
+ }
+ requests.add(req);
+ }
+
+ if (requests.size() > 0) {
+ // process updates
+ synchronized (reader.getUpdateLock()) {
+ try {
+ reader.updateCloudState(true);
+ CloudState cloudState = reader.getCloudState();
+ for (CloudStateUpdateRequest request : requests) {
+
+ switch (request.operation) {
+ case LeaderChange:
+ cloudState = setShardLeader(cloudState,
+ (String) request.args[0], (String) request.args[1],
+ (String) request.args[2]);
+
+ break;
+ case StateChange:
+ cloudState = updateState(cloudState,
+ (String) request.args[0], (CoreState) request.args[1]);
+ break;
+
+ }
+ }
+
+ log.info("Announcing new cluster state");
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
+
+ } catch (KeeperException e) {
+ // XXX stop processing, exit
+ return;
+ } catch (InterruptedException e) {
+ // XXX stop processing, exit
+ return;
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(STATE_UPDATE_DELAY);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+ }
+
+ private boolean amILeader() {
+ try {
+ ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, false));
+ if(myId.equals(props.get("id"))) {
+ return true;
+ }
+ } catch (KeeperException e) {
+ // assume we're dead
+ } catch (InterruptedException e) {
+ // assume we're dead
+ }
+ log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+ return false;
+ }
+ /**
+ * Try to assign core to the cluster
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+ String collection = coreState.getCollectionName();
+ String zkCoreNodeName = coreState.getCoreNodeName();
+
+ String shardId;
+ if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+ shardId = AssignShard.assignShard(collection, state);
+ } else {
+ shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+ }
+
+ Map<String,String> props = new HashMap<String,String>();
+ for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ ZkNodeProps zkProps = new ZkNodeProps(props);
+ Slice slice = state.getSlice(collection, shardId);
+ Map<String,ZkNodeProps> shardProps;
+ if (slice == null) {
+ shardProps = new HashMap<String,ZkNodeProps>();
+ } else {
+ shardProps = state.getSlice(collection, shardId).getShardsCopy();
+ }
+ shardProps.put(zkCoreNodeName, zkProps);
+
+ slice = new Slice(shardId, shardProps);
+ CloudState newCloudState = updateSlice(state, collection, slice);
+ return newCloudState;
+ }
+
+ private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ if (!newStates.containsKey(collection)) {
+ newStates.put(collection, new LinkedHashMap<String,Slice>());
+ }
+
+ final Map<String, Slice> slices = newStates.get(collection);
+ if (!slices.containsKey(slice.getName())) {
+ slices.put(slice.getName(), slice);
+ } else {
+ final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+ final Slice existingSlice = slices.get(slice.getName());
+ shards.putAll(existingSlice.getShards());
+ //XXX preserve existing leader
+ for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+ if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+ HashMap<String, String> newProps = new HashMap<String,String>();
+ newProps.putAll(edit.getValue().getProperties());
+ newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+ shards.put(edit.getKey(), new ZkNodeProps(newProps));
+ } else {
+ shards.put(edit.getKey(), edit.getValue());
+ }
+ }
+ final Slice updatedSlice = new Slice(slice.getName(), shards);
+ slices.put(slice.getName(), updatedSlice);
+ }
+ return new CloudState(state.getLiveNodes(), newStates);
+ }
+
+ private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+
+ boolean updated = false;
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ final Map<String, Slice> slices = newStates.get(collection);
+
+ if(slices==null) {
+ log.error("Could not mark shard leader for non existing collection.");
+ return state;
+ }
+
+ if (!slices.containsKey(sliceName)) {
+ log.error("Could not mark leader for non existing slice.");
+ return state;
+ } else {
+ final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+ for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+ Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+ newShardProps.putAll(shard.getValue().getProperties());
+
+ String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
+
+ ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+ if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+ newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+ if (wasLeader == null) {
+ updated = true;
+ }
+ } else {
+ if (wasLeader != null) {
+ updated = true;
+ }
+ }
+ newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+ }
+ Slice slice = new Slice(sliceName, newShards);
+ slices.put(sliceName, slice);
+ }
+ if (updated) {
+ return new CloudState(state.getLiveNodes(), newStates);
+ } else {
+ return state;
+ }
+ }
+
+ }
+
+ public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
+ log.info("Constructing new Overseer id=" + id);
this.zkClient = zkClient;
this.zkCmdExecutor = new ZkCmdExecutor();
- this.reader = reader;
createWatches();
+
+ //launch cluster state updater thread
+ ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
+ Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+ updaterThread.setDaemon(true);
+ updaterThread.start();
}
public synchronized void createWatches()
@@ -267,41 +498,6 @@ public class Overseer implements NodeSta
}
}
- /**
- * Try to assign core to the cluster
- * @throws KeeperException
- * @throws InterruptedException
- */
- private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
- String collection = coreState.getCollectionName();
- String zkCoreNodeName = coreState.getCoreNodeName();
-
- String shardId;
- if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
- shardId = AssignShard.assignShard(collection, state);
- } else {
- shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
- }
-
- Map<String,String> props = new HashMap<String,String>();
- for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
- props.put(entry.getKey(), entry.getValue());
- }
- ZkNodeProps zkProps = new ZkNodeProps(props);
- Slice slice = state.getSlice(collection, shardId);
- Map<String,ZkNodeProps> shardProps;
- if (slice == null) {
- shardProps = new HashMap<String,ZkNodeProps>();
- } else {
- shardProps = state.getSlice(collection, shardId).getShardsCopy();
- }
- shardProps.put(zkCoreNodeName, zkProps);
-
- slice = new Slice(shardId, shardProps);
- CloudState newCloudState = updateSlice(state, collection, slice);
- return newCloudState;
- }
-
private Set<String> complement(Collection<String> next,
Collection<String> prev) {
Set<String> downCollections = new HashSet<String>();
@@ -311,23 +507,11 @@ public class Overseer implements NodeSta
}
@Override
- public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException {
- log.debug("Cores changed: " + nodeName + " states:" + states);
- synchronized(reader.getUpdateLock()) {
- reader.updateCloudState(true);
- CloudState cloudState = reader.getCloudState();
- for (CoreState state : states) {
- cloudState = updateState(cloudState, nodeName, state);
- }
-
- try {
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState), true);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Interrupted while publishing new state", e);
- }
+ public void coreChanged(final String nodeName, final Set<CoreState> states)
+ throws KeeperException, InterruptedException {
+ log.info("Core change pooled: " + nodeName + " states:" + states);
+ for (CoreState state : states) {
+ fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
}
}
@@ -340,111 +524,11 @@ public class Overseer implements NodeSta
ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
zkCmdExecutor.ensureExists(node, zkClient);
}
-
- private CloudState updateSlice(CloudState state, String collection, Slice slice) {
-
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- if (!newStates.containsKey(collection)) {
- newStates.put(collection, new LinkedHashMap<String,Slice>());
- }
-
- final Map<String, Slice> slices = newStates.get(collection);
- if (!slices.containsKey(slice.getName())) {
- slices.put(slice.getName(), slice);
- } else {
- final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
- final Slice existingSlice = slices.get(slice.getName());
- shards.putAll(existingSlice.getShards());
- //XXX preserve existing leader
- for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
- if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
- HashMap<String, String> newProps = new HashMap<String,String>();
- newProps.putAll(edit.getValue().getProperties());
- newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
- shards.put(edit.getKey(), new ZkNodeProps(newProps));
- } else {
- shards.put(edit.getKey(), edit.getValue());
- }
- }
- final Slice updatedSlice = new Slice(slice.getName(), shards);
- slices.put(slice.getName(), updatedSlice);
- }
- return new CloudState(state.getLiveNodes(), newStates);
- }
-
- private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
-
- boolean updated = false;
- final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
- newStates.putAll(state.getCollectionStates());
-
- final Map<String, Slice> slices = newStates.get(collection);
-
- if(slices==null) {
- log.error("Could not mark shard leader for non existing collection.");
- return state;
- }
-
- if (!slices.containsKey(sliceName)) {
- log.error("Could not mark leader for non existing slice.");
- return state;
- } else {
- final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
- for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
- Map<String, String> newShardProps = new LinkedHashMap<String,String>();
- newShardProps.putAll(shard.getValue().getProperties());
-
- String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
-
- ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
- if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
- newShardProps.put(ZkStateReader.LEADER_PROP,"true");
- if (wasLeader == null) {
- updated = true;
- }
- } else {
- if (wasLeader != null) {
- updated = true;
- }
- }
- newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
- }
- Slice slice = new Slice(sliceName, newShards);
- slices.put(sliceName, slice);
- }
- if (updated) {
- return new CloudState(state.getLiveNodes(), newStates);
- } else {
- return state;
- }
- }
@Override
public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
- synchronized (reader.getUpdateLock()) {
- try {
- reader.updateCloudState(true); // get fresh copy of the state
- final CloudState state = reader.getCloudState();
- final CloudState newState = setShardLeader(state, collection, shardId,
- props.getCoreUrl());
- if (state != newState) { // if same instance was returned no need to
- // update state
- log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(newState), true);
-
- } else {
- log.debug("State was not changed.");
- }
- } catch (KeeperException e) {
- log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- }
- }
+ log.info("Leader change pooled.");
+ fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, props.getCoreUrl()));
}
}
\ No newline at end of file