You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/08 07:21:29 UTC
svn commit: r1347880 - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/ core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/core/ core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: siren
Date: Fri Jun 8 05:21:28 2012
New Revision: 1347880
URL: http://svn.apache.org/viewvc?rev=1347880&view=rev
Log:
SOLR-3511 refactor Overseer to use a queue
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (with props)
Removed:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/NodeStateWatcherTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Fri Jun 8 05:21:28 2012
@@ -19,13 +19,14 @@ package org.apache.solr;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
@@ -94,15 +95,11 @@ public class SolrLogFormatter extends Fo
methodAlias.put(new Method("org.apache.solr.update.processor.LogUpdateProcessor","finish"), "");
}
-
-
-
public static class CoreInfo {
- public static int maxCoreNum;
- public String shortId;
- public String url;
- CoreState coreState; // should be fine to keep a hard reference to this
- // CloudState cloudState; // should be fine to keep this hard reference since cloudstate is immutable and doesn't have pointers to anything heavyweight (like SolrCore, CoreContainer, etc)
+ static int maxCoreNum;
+ String shortId;
+ String url;
+ Map<String, String> coreProps;
}
Map<SolrCore, CoreInfo> coreInfoMap = new WeakHashMap<SolrCore, CoreInfo>(); // TODO: use something that survives across a core reload?
@@ -199,11 +196,15 @@ sb.append("(group_name=").append(tg.getN
sb.append(" url="+info.url + " node="+zkController.getNodeName());
}
- // look to see if local core state changed
- CoreState coreState = zkController.getCoreState(core.getName());
- if (coreState != info.coreState) {
- sb.append(" " + info.shortId + "_STATE=" + coreState);
- info.coreState = coreState;
+ if(info.coreProps == null) {
+ info.coreProps = getCoreProps(zkController, core);
+ }
+
+ Map<String, String> coreProps = getCoreProps(zkController, core);
+ if(!coreProps.equals(info.coreProps)) {
+ info.coreProps = coreProps;
+ final String corePropsString = "coll:" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + " core:" + core.getName() + " props:" + coreProps;
+ sb.append(" " + info.shortId + "_STATE=" + corePropsString);
}
}
}
@@ -260,6 +261,16 @@ sb.append("(group_name=").append(tg.getN
return sb.toString();
}
+ private Map<String,String> getCoreProps(ZkController zkController, SolrCore core) {
+ final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ ZkNodeProps props = zkController.getCloudState().getShardProps(collection, ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+ if(props!=null) {
+ return props.getProperties();
+ }
+ return Collections.EMPTY_MAP;
+ }
+
+
private Method classAndMethod = new Method(null,null); // don't need to be thread safe
private String getShortClassName(String name, String method) {
classAndMethod.className = name;
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1347880&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Jun 8 05:21:28 2012
@@ -0,0 +1,284 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A distributed queue from zk recipes.
+ */
+public class DistributedQueue {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DistributedQueue.class);
+
+ private final String dir;
+
+ private ZooKeeper zookeeper;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private final String prefix = "qn-";
+
+ public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+ this.dir = dir;
+
+ if (acl != null) {
+ this.acl = acl;
+ }
+ this.zookeeper = zookeeper;
+
+ }
+
+ /**
+ * Returns a Map of the children, ordered by id.
+ *
+ * @param watcher
+ * optional watcher on getChildren() operation.
+ * @return map from id to child name for all children
+ */
+ private TreeMap<Long,String> orderedChildren(Watcher watcher)
+ throws KeeperException, InterruptedException {
+ TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
+
+ List<String> childNames = null;
+ try {
+ childNames = zookeeper.getChildren(dir, watcher);
+ } catch (KeeperException.NoNodeException e) {
+ throw e;
+ }
+
+ for (String childName : childNames) {
+ try {
+ // Check format
+ if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
+ LOG.warn("Found child node with improper name: " + childName);
+ continue;
+ }
+ String suffix = childName.substring(prefix.length());
+ Long childId = new Long(suffix);
+ orderedChildren.put(childId, childName);
+ } catch (NumberFormatException e) {
+ LOG.warn("Found child node with improper format : " + childName + " "
+ + e, e);
+ }
+ }
+
+ return orderedChildren;
+ }
+
+ /**
+ * Return the head of the queue without modifying the queue.
+ *
+ * @return the data at the head of the queue.
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] element() throws NoSuchElementException, KeeperException,
+ InterruptedException {
+ TreeMap<Long,String> orderedChildren;
+
+ // element, take, and remove follow the same pattern.
+ // We want to return the child node with the smallest sequence number.
+ // Since other clients are remove()ing and take()ing nodes concurrently,
+ // the child with the smallest sequence number in orderedChildren might be
+ // gone by the time we check.
+ // We don't call getChildren again until we have tried the rest of the nodes
+ // in sequence order.
+ while (true) {
+ try {
+ orderedChildren = orderedChildren(null);
+ } catch (KeeperException.NoNodeException e) {
+ throw new NoSuchElementException();
+ }
+ if (orderedChildren.size() == 0) throw new NoSuchElementException();
+
+ for (String headNode : orderedChildren.values()) {
+ if (headNode != null) {
+ try {
+ return zookeeper.getData(dir + "/" + headNode, false, null);
+ } catch (KeeperException.NoNodeException e) {
+ // Another client removed the node first, try next
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Attempts to remove the head of the queue and return it.
+ *
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] remove() throws NoSuchElementException, KeeperException,
+ InterruptedException {
+ TreeMap<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while (true) {
+ try {
+ orderedChildren = orderedChildren(null);
+ } catch (KeeperException.NoNodeException e) {
+ throw new NoSuchElementException();
+ }
+ if (orderedChildren.size() == 0) throw new NoSuchElementException();
+
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
+ try {
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first.
+ }
+ }
+
+ }
+ }
+
+ private class LatchChildWatcher implements Watcher {
+
+ CountDownLatch latch;
+
+ public LatchChildWatcher() {
+ latch = new CountDownLatch(1);
+ }
+
+ public void process(WatchedEvent event) {
+ LOG.debug("Watcher fired on path: " + event.getPath() + " state: "
+ + event.getState() + " type " + event.getType());
+ latch.countDown();
+ }
+
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+ }
+
+ /**
+ * Removes the head of the queue and returns it, blocks until it succeeds.
+ *
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] take() throws KeeperException, InterruptedException {
+ TreeMap<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while (true) {
+ LatchChildWatcher childWatcher = new LatchChildWatcher();
+ try {
+ orderedChildren = orderedChildren(childWatcher);
+ } catch (KeeperException.NoNodeException e) {
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ continue;
+ }
+ if (orderedChildren.size() == 0) {
+ childWatcher.await();
+ continue;
+ }
+
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
+ try {
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first.
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts data into queue.
+ *
+ * @param data
+ * @return true if data was successfully added
+ */
+ public boolean offer(byte[] data) throws KeeperException,
+ InterruptedException {
+ for (;;) {
+ try {
+ zookeeper.create(dir + "/" + prefix, data, acl,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ return true;
+ } catch (KeeperException.NoNodeException e) {
+ try {
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException ne) {
+ //someone created it
+ }
+ }
+ }
+
+
+
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty.
+ *
+ * @return data at the first element of the queue, or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] peek() throws KeeperException, InterruptedException {
+ try {
+ return element();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Attempts to remove the head of the queue and return it. Returns null if the
+ * queue is empty.
+ *
+ * @return Head of the queue or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] poll() throws KeeperException, InterruptedException {
+ try {
+ return remove();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Jun 8 05:21:28 2012
@@ -8,7 +8,6 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -96,6 +95,13 @@ class ShardLeaderElectionContextBase ext
leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
CreateMode.EPHEMERAL, true);
}
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ "leader", ZkStateReader.SHARD_ID_PROP, shardId,
+ ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
+ leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
+ ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP));
+ Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
}
}
@@ -240,10 +246,10 @@ final class OverseerElectionContext exte
private final SolrZkClient zkClient;
private final ZkStateReader stateReader;
- public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
+ public OverseerElectionContext(final String zkNodeName, ZkStateReader stateReader) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
- this.zkClient = zkClient;
this.stateReader = stateReader;
+ this.zkClient = stateReader.getZkClient();
}
@Override
@@ -265,7 +271,7 @@ final class OverseerElectionContext exte
CreateMode.EPHEMERAL, true);
}
- new Overseer(zkClient, stateReader, id);
+ new Overseer(stateReader, id);
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Jun 8 05:21:28 2012
@@ -17,157 +17,169 @@ package org.apache.solr.cloud;
* the License.
*/
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.Set;
-import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
-import org.apache.solr.cloud.ShardLeaderWatcher.ShardLeaderListener;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkOperation;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cluster leader. Responsible node assignments, cluster state file?
*/
-public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+public class Overseer {
+ public static final String QUEUE_OPERATION = "operation";
private static final int STATE_UPDATE_DELAY = 500; // delay between cloud state updates
- static enum Op {
- LeaderChange, StateChange, CoreDeleted;
- }
-
- private final class CloudStateUpdateRequest {
-
- final Op operation;
- final Object[] args;
-
- CloudStateUpdateRequest(final Op operation, final Object... args) {
- this.operation = operation;
- this.args = args;
- }
- }
-
- public static final String STATES_NODE = "/node_states";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
- private final SolrZkClient zkClient;
-
- // pooled updates
- private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
-
- // node stateWatches
- private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
-
- // shard leader watchers (collection->slice->watcher)
- private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
- private ZkCmdExecutor zkCmdExecutor;
-
private static class CloudStateUpdater implements Runnable {
- private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
private final ZkStateReader reader;
private final SolrZkClient zkClient;
private final String myId;
-
- public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
- this.fifo = fifo;
+ //queue where everybody can throw tasks
+ private final DistributedQueue stateUpdateQueue;
+ //Internal queue where overseer stores events that have not yet been published into cloudstate
+ //If Overseer dies while extracting the main queue a new overseer will start from this queue
+ private final DistributedQueue workQueue;
+
+ public CloudStateUpdater(final ZkStateReader reader, final String myId) {
+ this.zkClient = reader.getZkClient();
+ this.stateUpdateQueue = getInQueue(zkClient);
+ this.workQueue = getInternalQueue(zkClient);
this.myId = myId;
this.reader = reader;
- this.zkClient = zkClient;
}
- @Override
- public void run() {
- while (amILeader()) {
-
-
- LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
- while (!fifo.isEmpty()) {
- // collect all queued requests
- CloudStateUpdateRequest req;
- req = fifo.poll();
- if (req == null) {
- break;
- }
- requests.add(req);
- }
-
- if (requests.size() > 0) {
- // process updates
- synchronized (reader.getUpdateLock()) {
- try {
+
+ @Override
+ public void run() {
+
+ if(amILeader()) {
+ // see if there's something left from the previous Overseer and re
+ // process all events that were not persisted into cloud state
+ synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
+ try {
+ byte[] head = workQueue.peek();
+
+ if (head != null) {
reader.updateCloudState(true);
CloudState cloudState = reader.getCloudState();
- for (CloudStateUpdateRequest request : requests) {
-
- switch (request.operation) {
- case LeaderChange:
- cloudState = setShardLeader(cloudState,
- (String) request.args[0], (String) request.args[1],
- (String) request.args[2]);
-
- break;
- case StateChange:
- cloudState = updateState(cloudState,
- (String) request.args[0], (CoreState) request.args[1]);
- break;
-
- case CoreDeleted:
- cloudState = removeCore(cloudState, (String) request.args[0], (String) request.args[1]);
- break;
- }
+ log.info("Replaying operations from work queue.");
+
+ while (head != null) {
+ final ZkNodeProps message = ZkNodeProps.load(head);
+ final String operation = message
+ .get(QUEUE_OPERATION);
+ cloudState = processMessage(cloudState, message, operation);
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
+ workQueue.remove();
+ head = workQueue.peek();
}
-
- log.info("Announcing new cluster state");
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState), true);
-
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- SolrException.log(log, "", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("Solr cannot talk to ZK");
return;
}
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
}
-
+ }
+
+ log.info("Starting to work on the main queue");
+ while (amILeader()) {
+ synchronized (reader.getUpdateLock()) {
try {
- Thread.sleep(STATE_UPDATE_DELAY);
+ byte[] head = stateUpdateQueue.peek();
+
+ if (head != null) {
+ reader.updateCloudState(true);
+ CloudState cloudState = reader.getCloudState();
+
+ while (head != null) {
+ final ZkNodeProps message = ZkNodeProps.load(head);
+ final String operation = message.get(QUEUE_OPERATION);
+
+ cloudState = processMessage(cloudState, message, operation);
+ byte[] processed = stateUpdateQueue.remove();
+ workQueue.offer(processed);
+ head = stateUpdateQueue.peek();
+ }
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
+ }
+ // clean work queue
+ while (workQueue.poll() != null);
+
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("Overseer cannot talk to ZK");
+ return;
+ }
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ return;
}
}
+
+ try {
+ Thread.sleep(STATE_UPDATE_DELAY);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private CloudState processMessage(CloudState cloudState,
+ final ZkNodeProps message, final String operation)
+ throws KeeperException, InterruptedException {
+ if ("state".equals(operation)) {
+ cloudState = updateState(cloudState, message);
+ } else if ("deletecore".equals(operation)) {
+ cloudState = removeCore(cloudState, message);
+ } else if (ZkStateReader.LEADER_PROP.equals(operation)) {
+ StringBuilder sb = new StringBuilder();
+ String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
+ String coreName = message.get(ZkStateReader.CORE_NAME_PROP);
+ sb.append(baseUrl);
+ if (!baseUrl.endsWith("/")) sb.append("/");
+ sb.append(coreName == null ? "" : coreName);
+ if (!(sb.substring(sb.length() - 1).equals("/"))) sb
+ .append("/");
+ cloudState = setShardLeader(cloudState,
+ message.get(ZkStateReader.COLLECTION_PROP),
+ message.get(ZkStateReader.SHARD_ID_PROP), sb.toString());
+ } else {
+ throw new RuntimeException("unknown operation:" + operation
+ + " contents:" + message.getProperties());
}
+ return cloudState;
+ }
private boolean amILeader() {
try {
@@ -188,32 +200,35 @@ public class Overseer implements NodeSta
* @throws KeeperException
* @throws InterruptedException
*/
- private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
- String collection = coreState.getCollectionName();
- String zkCoreNodeName = coreState.getCoreNodeName();
+ private CloudState updateState(CloudState state, final ZkNodeProps message) throws KeeperException, InterruptedException {
+ final String collection = message.get(ZkStateReader.COLLECTION_PROP);
+ final String zkCoreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
+ final Integer numShards = message.get(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.get(ZkStateReader.NUM_SHARDS_PROP)):null;
//collection does not yet exist, create placeholders if num shards is specified
- if (!state.getCollections().contains(coreState.getCollectionName())
- && coreState.getNumShards() != null) {
- state = createCollection(state, collection, coreState.getNumShards());
+ if (!state.getCollections().contains(collection)
+ && numShards!=null) {
+ state = createCollection(state, collection, numShards);
}
// use the provided non null shardId
- String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
- if(shardId==null) {
- //use shardId from CloudState
- shardId = getAssignedId(state, nodeName, coreState);
+ String shardId = message.get(ZkStateReader.SHARD_ID_PROP);
+ if (shardId == null) {
+ String nodeName = message.get(ZkStateReader.NODE_NAME_PROP);
+ //get shardId from CloudState
+ shardId = getAssignedId(state, nodeName, message);
}
- if(shardId==null) {
+ if(shardId == null) {
//request new shardId
- shardId = AssignShard.assignShard(collection, state, coreState.getNumShards());
+ shardId = AssignShard.assignShard(collection, state, numShards);
}
Map<String,String> props = new HashMap<String,String>();
- Map<String,String> coreProps = new HashMap<String,String>(coreState.getProperties().size());
- coreProps.putAll(coreState.getProperties());
+ Map<String,String> coreProps = new HashMap<String,String>(message.getProperties().size());
+ coreProps.putAll(message.getProperties());
// we don't put num_shards in the clusterstate
- coreProps.remove("num_shards");
+ coreProps.remove(ZkStateReader.NUM_SHARDS_PROP);
+ coreProps.remove(QUEUE_OPERATION);
for (Entry<String,String> entry : coreProps.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
@@ -249,9 +264,9 @@ public class Overseer implements NodeSta
* Return an already assigned id or null if not assigned
*/
private String getAssignedId(final CloudState state, final String nodeName,
- final CoreState coreState) {
- final String key = coreState.getProperties().get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.getProperties().get(ZkStateReader.CORE_NAME_PROP);
- Map<String, Slice> slices = state.getSlices(coreState.getCollectionName());
+ final ZkNodeProps coreState) {
+ final String key = coreState.get(ZkStateReader.NODE_NAME_PROP) + "_" + coreState.get(ZkStateReader.CORE_NAME_PROP);
+ Map<String, Slice> slices = state.getSlices(coreState.get(ZkStateReader.COLLECTION_PROP));
if (slices != null) {
for (Slice slice : slices.values()) {
if (slice.getShards().get(key) != null) {
@@ -303,12 +318,12 @@ public class Overseer implements NodeSta
final Map<String, Slice> slices = newStates.get(collection);
if(slices==null) {
- log.error("Could not mark shard leader for non existing collection.");
+ log.error("Could not mark shard leader for non existing collection:" + collection);
return state;
}
if (!slices.containsKey(sliceName)) {
- log.error("Could not mark leader for non existing slice.");
+ log.error("Could not mark leader for non existing slice:" + sliceName);
return state;
} else {
final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
@@ -333,7 +348,11 @@ public class Overseer implements NodeSta
/*
* Remove core from cloudstate
*/
- private CloudState removeCore(final CloudState cloudState, final String collection, final String coreNodeName) {
+ private CloudState removeCore(final CloudState cloudState, ZkNodeProps message) {
+
+ final String coreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
+ final String collection = message.get(ZkStateReader.COLLECTION_PROP);
+
final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
for(String collectionName: cloudState.getCollections()) {
if(collection.equals(collectionName)) {
@@ -360,255 +379,42 @@ public class Overseer implements NodeSta
}
}
- public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
- log.info("Constructing new Overseer id=" + id);
- this.zkClient = zkClient;
- this.zkCmdExecutor = new ZkCmdExecutor();
- createWatches();
-
+ public Overseer(final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
+ log.info("Overseer (id=" + id + ") starting");
//launch cluster state updater thread
- ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
- Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+ ThreadGroup tg = new ThreadGroup("Overseer state updater.");
+ Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
updaterThread.setDaemon(true);
updaterThread.start();
}
-
- public synchronized void createWatches()
- throws KeeperException, InterruptedException {
- addCollectionsWatch();
- addLiveNodesWatch();
- }
-
- /*
- * Watch for collections so we can add watches for its shard leaders.
- */
- private void addCollectionsWatch() throws KeeperException,
- InterruptedException {
-
- zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
-
- List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this, true);
- collectionsChanged(collections);
- } catch (KeeperException e) {
- if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- }
- }
- }, true);
-
- collectionsChanged(collections);
- }
-
- private void collectionsChanged(Collection<String> collections) throws KeeperException, InterruptedException {
- synchronized (shardLeaderWatches) {
- for(String collection: collections) {
- if(!shardLeaderWatches.containsKey(collection)) {
- shardLeaderWatches.put(collection, new HashMap<String,ShardLeaderWatcher>());
- addShardLeadersWatch(collection);
- }
- }
- //XXX not handling delete collections..
- }
- }
/**
- * Add a watch for node containing shard leaders for a collection
- * @param collection
- * @throws KeeperException
- * @throws InterruptedException
+ * Get queue that can be used to send messages to Overseer.
*/
- private void addShardLeadersWatch(final String collection) throws KeeperException,
- InterruptedException {
-
- zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null), zkClient);
-
- final List<String> leaderNodes = zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> leaderNodes = zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), this, true);
-
- processLeaderNodesChanged(collection, leaderNodes);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- SolrException.log(log, "", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- }
- }
- }, true);
-
- processLeaderNodesChanged(collection, leaderNodes);
- }
-
- /**
- * Process change in shard leaders. Make sure we have watches for each leader.
- */
- private void processLeaderNodesChanged(final String collection, final Collection<String> shardIds) {
- if(log.isInfoEnabled()) {
- log.info("Leader nodes changed for collection: " + collection + " nodes now:" + shardIds);
- }
-
- Map<String, ShardLeaderWatcher> watches = shardLeaderWatches.get(collection);
- Set<String> currentWatches = new HashSet<String>();
- currentWatches.addAll(watches.keySet());
-
- Set<String> newLeaders = complement(shardIds, currentWatches);
-
- Set<String> lostLeaders = complement(currentWatches, shardIds);
- //remove watches for lost shards
- for (String shardId : lostLeaders) {
- ShardLeaderWatcher watcher = watches.remove(shardId);
- if (watcher != null) {
- watcher.close();
- }
- }
-
- //add watches for the new shards
- for(String shardId: newLeaders) {
- try {
- ShardLeaderWatcher watcher = new ShardLeaderWatcher(shardId, collection, zkClient, this);
- watches.put(shardId, watcher);
- } catch (KeeperException e) {
- log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
- }
+ public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue", null);
+ }
+
+ /* Internal queue, not to be used outside of Overseer */
+ static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
+ createOverseerNode(zkClient);
+ return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue-work", null);
+ }
+
+ private static void createOverseerNode(final SolrZkClient zkClient) {
+ try {
+ zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException e) {
+ //ok
+ } catch (InterruptedException e) {
+ log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+ throw new RuntimeException(e);
}
}
- private void addLiveNodesWatch() throws KeeperException,
- InterruptedException {
- List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(
- ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> liveNodes = zkClient.getChildren(
- ZkStateReader.LIVE_NODES_ZKNODE, this, true);
- synchronized (nodeStateWatches) {
- processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- SolrException.log(log, "", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- }
- }
- }, true);
- }
- });
-
- processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
- }
-
- private void processLiveNodesChanged(Collection<String> oldLiveNodes,
- Collection<String> liveNodes) throws InterruptedException, KeeperException {
-
- Set<String> upNodes = complement(liveNodes, oldLiveNodes);
- if (upNodes.size() > 0) {
- addNodeStateWatches(upNodes);
- }
-
- Set<String> downNodes = complement(oldLiveNodes, liveNodes);
- for(String node: downNodes) {
- synchronized (nodeStateWatches) {
- NodeStateWatcher watcher = nodeStateWatches.remove(node);
- }
- log.debug("Removed NodeStateWatcher for node:" + node);
- }
- }
-
- private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
-
- for (String nodeName : nodeNames) {
- final String path = STATES_NODE + "/" + nodeName;
- synchronized (nodeStateWatches) {
- if (!nodeStateWatches.containsKey(nodeName)) {
- zkCmdExecutor.ensureExists(path, zkClient);
- nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
- log.debug("Added NodeStateWatcher for node " + nodeName);
- } else {
- log.debug("watch already added");
- }
- }
- }
- }
-
- private Set<String> complement(Collection<String> next,
- Collection<String> prev) {
- Set<String> downCollections = new HashSet<String>();
- downCollections.addAll(next);
- downCollections.removeAll(prev);
- return downCollections;
- }
-
- @Override
- public void coreChanged(final String nodeName, final Set<CoreState> states)
- throws KeeperException, InterruptedException {
- log.info("Core change pooled: " + nodeName + " states:" + states);
- for (CoreState state : states) {
- fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
- }
- }
-
- @Override
- public void coreDeleted(String nodeName, Collection<CoreState> states)
- throws KeeperException, InterruptedException {
- for (CoreState state : states) {
- fifo.add(new CloudStateUpdateRequest(Op.CoreDeleted, state.getCollectionName(), state.getCoreNodeName()));
- }
- }
-
- public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
- final String node = STATES_NODE + "/" + nodeName;
- if (log.isInfoEnabled()) {
- log.info("creating node:" + node);
- }
-
- ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
- zkCmdExecutor.ensureExists(node, zkClient);
- }
-
- @Override
- public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
- String coreUrl = props.getCoreUrl();
- log.info("Leader change pooled: " + coreUrl);
- fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, coreUrl));
- }
-
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Fri Jun 8 05:21:28 2012
@@ -36,6 +36,7 @@ import org.apache.solr.common.cloud.Safe
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
@@ -52,6 +53,7 @@ import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,11 +100,10 @@ public class RecoveryStrategy extends Th
private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
- final String shardZkNodeName, final CoreDescriptor cd) {
+ final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
SolrException.log(log, "Recovery failed - I give up.");
try {
- zkController.publishAsRecoveryFailed(baseUrl, cd,
- shardZkNodeName, core.getName());
+ zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
} finally {
close();
}
@@ -208,7 +209,18 @@ public class RecoveryStrategy extends Th
log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
- doRecovery(core);
+ try {
+ doRecovery(core);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
} finally {
if (core != null) core.close();
SolrRequestInfo.clearRequestInfo();
@@ -216,7 +228,7 @@ public class RecoveryStrategy extends Th
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- public void doRecovery(SolrCore core) {
+ public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
boolean replayed = false;
boolean successfulRecovery = false;
@@ -327,8 +339,8 @@ public class RecoveryStrategy extends Th
// }
// sync success - register as active and return
- zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
- coreZkNodeName, coreName);
+ zkController.publish(core.getCoreDescriptor(),
+ ZkStateReader.ACTIVE);
successfulRecovery = true;
close = true;
return;
@@ -352,8 +364,7 @@ public class RecoveryStrategy extends Th
log.info("Recovery was successful - registering as Active");
// if there are pending recovery requests, don't advert as active
- zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
- coreZkNodeName, coreName);
+ zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
close = true;
successfulRecovery = true;
} catch (InterruptedException e) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jun 8 05:21:28 2012
@@ -39,7 +39,6 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -85,6 +84,7 @@ public final class ZkController {
private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
+ private final DistributedQueue overseerStatusQueue;
// package private for tests
@@ -93,11 +93,6 @@ public final class ZkController {
public final static String COLLECTION_PARAM_PREFIX="collection.";
public final static String CONFIGNAME_PROP="configName";
- private Map<String, CoreState> coreStates = new HashMap<String, CoreState>(); // key is the local core name
- private long coreStatesVersion; // bumped by 1 each time we serialize coreStates... sync on coreStates
- private long coreStatesPublishedVersion; // last version published to ZK... sync on coreStatesPublishLock
- private Object coreStatesPublishLock = new Object(); // only publish one at a time
-
private final Map<String, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
private SolrZkClient zkClient;
@@ -210,7 +205,7 @@ public final class ZkController {
// seems we dont need to do this again...
//Overseer.createClientNodes(zkClient, getNodeName());
- ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+ ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -223,8 +218,7 @@ public final class ZkController {
final String coreZkNodeName = getNodeName() + "_"
+ descriptor.getName();
try {
- publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
- descriptor.getName());
+ publish(descriptor, ZkStateReader.DOWN);
waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) {
SolrException.log(log, "", e);
@@ -261,6 +255,7 @@ public final class ZkController {
});
+ this.overseerStatusQueue = Overseer.getInQueue(zkClient);
cmdExecutor = new ZkCmdExecutor();
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
@@ -302,13 +297,6 @@ public final class ZkController {
return zkStateReader.getCloudState();
}
- /** @return the CoreState for the core, which may not yet be visible to ZooKeeper or other nodes in the cluster */
- public CoreState getCoreState(String coreName) {
- synchronized (coreStates) {
- return coreStates.get(coreName);
- }
- }
-
/**
* @param zkConfigName
* @param fileName
@@ -387,14 +375,11 @@ public final class ZkController {
// makes nodes zkNode
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
- Overseer.createClientNodes(zkClient, getNodeName());
createEphemeralLiveNode();
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
- syncNodeState();
-
overseerElector = new LeaderElector(zkClient);
- ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+ ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
overseerElector.setup(context);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -416,27 +401,6 @@ public final class ZkController {
}
}
-
- /*
- * sync internal state with zk on startup
- */
- private void syncNodeState() throws KeeperException, InterruptedException {
- log.debug("Syncing internal state with zk. Current: " + coreStates);
- final String path = Overseer.STATES_NODE + "/" + getNodeName();
-
- final byte[] data = zkClient.getData(path, null, null, true);
-
- if (data != null) {
- CoreState[] states = CoreState.load(data);
- synchronized (coreStates) {
- coreStates.clear(); // TODO: should we do this?
- for(CoreState coreState: states) {
- coreStates.put(coreState.getCoreName(), coreState);
- }
- }
- }
- log.debug("after sync: " + coreStates);
- }
public boolean isConnected() {
return zkClient.isConnected();
@@ -640,7 +604,7 @@ public final class ZkController {
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
if (!didRecovery) {
- publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+ publish(desc, ZkStateReader.ACTIVE);
}
} finally {
if (core != null) {
@@ -648,7 +612,7 @@ public final class ZkController {
}
}
} else {
- publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+ publish(desc, ZkStateReader.ACTIVE);
}
// make sure we have an update cluster state right away
@@ -760,50 +724,34 @@ public final class ZkController {
return baseURL;
}
-
- void publishAsActive(String shardUrl,
- final CoreDescriptor cd, String shardZkNodeName, String coreName) {
- Map<String,String> finalProps = new HashMap<String,String>();
- finalProps.put(ZkStateReader.BASE_URL_PROP, shardUrl);
- finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-
- publishState(cd, shardZkNodeName, coreName, finalProps);
- }
-
- public void publish(CoreDescriptor cd, String state) {
- Map<String,String> finalProps = new HashMap<String,String>();
- finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
- finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
- finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- finalProps.put(ZkStateReader.STATE_PROP, state);
- publishState(cd, getNodeName() + "_" + cd.getName(),
- cd.getName(), finalProps);
- }
-
- void publishAsDown(String baseUrl,
- final CoreDescriptor cd, String shardZkNodeName, String coreName) {
- Map<String,String> finalProps = new HashMap<String,String>();
- finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
- finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
-
- publishState(cd, shardZkNodeName, coreName, finalProps);
- }
-
- void publishAsRecoveryFailed(String baseUrl,
- final CoreDescriptor cd, String shardZkNodeName, String coreName) {
- Map<String,String> finalProps = new HashMap<String,String>();
- finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
- finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
- publishState(cd, shardZkNodeName, coreName, finalProps);
+ /**
+ * Publish core state to overseer.
+ * @param cd
+ * @param state
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void publish(final CoreDescriptor cd, final String state) throws KeeperException, InterruptedException {
+ //System.out.println(Thread.currentThread().getStackTrace()[3]);
+ Integer numShards = cd.getCloudDescriptor().getNumShards();
+ if (numShards == null) { //XXX sys prop hack
+ numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
+ }
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.STATE_PROP, state,
+ ZkStateReader.BASE_URL_PROP, getBaseUrl(),
+ ZkStateReader.CORE_NAME_PROP, cd.getName(),
+ ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
+ ZkStateReader.NODE_NAME_PROP, getNodeName(),
+ ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
+ ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
+ .getCollectionName(), ZkStateReader.STATE_PROP, state,
+ ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
+ : null);
+ overseerStatusQueue.offer(ZkStateReader.toJSON(m));
}
-
private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
final CloudState state, final String shardZkNodeName) {
@@ -826,10 +774,12 @@ public final class ZkController {
*/
public void unregister(String coreName, CloudDescriptor cloudDesc)
throws InterruptedException, KeeperException {
- synchronized (coreStates) {
- coreStates.remove(coreName);
- }
- publishState();
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ "deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.NODE_NAME_PROP, getNodeName(),
+ ZkStateReader.COLLECTION_PROP, cloudDesc.getCollectionName());
+ overseerStatusQueue.offer(ZkStateReader.toJSON(m));
+
final String zkNodeName = getNodeName() + "_" + coreName;
ElectionContext context = electionContexts.remove(zkNodeName);
if (context != null) {
@@ -993,83 +943,6 @@ public final class ZkController {
return zkStateReader;
}
-
- private void publishState(CoreDescriptor cd, String shardZkNodeName, String coreName,
- Map<String,String> props) {
- CloudDescriptor cloudDesc = cd.getCloudDescriptor();
- if (cloudDesc.getRoles() != null) {
- props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
- }
-
- if (cloudDesc.getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
- // publish with no shard id so we are assigned one, and then look for it
- doPublish(shardZkNodeName, coreName, props, cloudDesc);
- String shardId;
- try {
- shardId = doGetShardIdProcess(coreName, cloudDesc);
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
- }
- cloudDesc.setShardId(shardId);
- }
-
-
- if (!props.containsKey(ZkStateReader.SHARD_ID_PROP) && cloudDesc.getShardId() != null) {
- props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
- }
-
- doPublish(shardZkNodeName, coreName, props, cloudDesc);
- }
-
-
- private void doPublish(String shardZkNodeName, String coreName,
- Map<String,String> props, CloudDescriptor cloudDesc) {
- Integer numShards = cloudDesc.getNumShards();
- if (numShards == null) {
- numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
- }
- CoreState coreState = new CoreState(coreName,
- cloudDesc.getCollectionName(), props, numShards);
-
- synchronized (coreStates) {
- coreStates.put(coreName, coreState);
- }
-
- publishState();
- }
-
- private void publishState() {
- final String nodePath = "/node_states/" + getNodeName();
-
- long version;
- byte[] coreStatesData;
- synchronized (coreStates) {
- version = ++coreStatesVersion;
- coreStatesData = ZkStateReader.toJSON(coreStates.values());
- }
-
- // if multiple threads are trying to publish state, make sure that we never write
- // an older version after a newer version.
- synchronized (coreStatesPublishLock) {
- try {
- if (version < coreStatesPublishedVersion) {
- log.info("Another thread already published a newer coreStates: ours="+version + " lastPublished=" + coreStatesPublishedVersion);
- } else {
- zkClient.setData(nodePath, coreStatesData, true);
- coreStatesPublishedVersion = version; // put it after so it won't be set if there's an exception
- }
- } catch (KeeperException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "could not publish node state", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "could not publish node state", e);
- }
- }
- }
-
private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
throws InterruptedException {
final String shardZkNodeName = getNodeName() + "_" + coreName;
@@ -1086,6 +959,7 @@ public final class ZkController {
Thread.currentThread().interrupt();
}
}
+
throw new SolrException(ErrorCode.SERVER_ERROR,
"Could not get shard_id for core: " + coreName);
}
@@ -1106,14 +980,30 @@ public final class ZkController {
}
}
+ private String getCoreNodeName(CoreDescriptor descriptor){
+ return getNodeName() + "_"
+ + descriptor.getName();
+ }
+
public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
}
- public void preRegister(CoreDescriptor cd) {
+ public void preRegister(CoreDescriptor cd) throws KeeperException, InterruptedException {
// before becoming available, make sure we are not live and active
// this also gets us our assigned shard id if it was not specified
publish(cd, ZkStateReader.DOWN);
+ String shardZkNodeName = getCoreNodeName(cd);
+ if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
+ String shardId;
+ try {
+ shardId = doGetShardIdProcess(cd.getName(), cd.getCloudDescriptor());
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ }
+ cd.getCloudDescriptor().setShardId(shardId);
+ }
+
}
private ZkCoreNodeProps waitForLeaderToSeeDownState(
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Jun 8 05:21:28 2012
@@ -629,7 +629,18 @@ public class CoreContainer
if (zkController != null) {
// this happens before we can receive requests
- zkController.preRegister(core.getCoreDescriptor());
+ try {
+ zkController.preRegister(core.getCoreDescriptor());
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
}
SolrCore old = null;
@@ -662,7 +673,6 @@ public class CoreContainer
}
}
-
private void registerInZk(SolrCore core) {
if (zkController != null) {
try {
@@ -676,7 +686,18 @@ public class CoreContainer
} catch (Exception e) {
// if register fails, this is really bad - close the zkController to
// minimize any damage we can cause
- zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+ try {
+ zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+ } catch (KeeperException e1) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Jun 8 05:21:28 2012
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -42,7 +41,6 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
@@ -62,7 +60,6 @@ public class OverseerTest extends SolrTe
private final String nodeName;
private final String collection;
private final LeaderElector elector;
- private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
@@ -71,7 +68,6 @@ public class OverseerTest extends SolrTe
zkClient = new SolrZkClient(zkAddress, TIMEOUT);
zkStateReader = new ZkStateReader(zkClient);
zkStateReader.createClusterStateWatchersAndUpdate();
- Overseer.createClientNodes(zkClient, nodeName);
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
@@ -104,29 +100,29 @@ public class OverseerTest extends SolrTe
public void publishState(String coreName, String stateName, int numShards)
throws KeeperException, InterruptedException, IOException {
if (stateName == null) {
- coreStates.remove(coreName);
ElectionContext ec = electionContext.remove(coreName);
if (ec != null) {
ec.cancelElection();
}
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "deletecore",
+ ZkStateReader.NODE_NAME_PROP, nodeName,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.COLLECTION_PROP, collection);
+ DistributedQueue q = Overseer.getInQueue(zkClient);
+ q.offer(ZkStateReader.toJSON(m));
+
} else {
- HashMap<String,String> coreProps = new HashMap<String,String>();
- coreProps.put(ZkStateReader.STATE_PROP, stateName);
- coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
- coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- coreProps.put(ZkStateReader.COLLECTION_PROP, collection);
- coreProps.put(ZkStateReader.BASE_URL_PROP, "http://" + nodeName
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.STATE_PROP, stateName,
+ ZkStateReader.NODE_NAME_PROP, nodeName,
+ ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
+ ZkStateReader.BASE_URL_PROP, "http://" + nodeName
+ "/solr/");
- CoreState state = new CoreState(coreName, collection, coreProps,
- numShards);
- coreStates.remove(coreName);
- coreStates.put(coreName, state);
- }
- final String statePath = Overseer.STATES_NODE + "/" + nodeName;
- zkClient.setData(
- statePath,
- ZkStateReader.toJSON(coreStates.values().toArray(
- new CoreState[coreStates.size()])), true);
+ DistributedQueue q = Overseer.getInQueue(zkClient);
+ q.offer(ZkStateReader.toJSON(m));
+ }
for (int i = 0; i < 30; i++) {
String shardId = getShardId(coreName);
@@ -196,8 +192,6 @@ public class OverseerTest extends SolrTe
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
-
zkController = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
"localhost", "8983", "solr", new CurrentCoreDescriptorProvider() {
@@ -216,6 +210,7 @@ public class OverseerTest extends SolrTe
for (int i = 0; i < numShards; i++) {
CloudDescriptor collection1Desc = new CloudDescriptor();
+ collection1Desc.setNumShards(3);
collection1Desc.setCollectionName("collection1");
CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
desc1.setCloudDescriptor(collection1Desc);
@@ -238,16 +233,13 @@ public class OverseerTest extends SolrTe
assertNotNull(reader.getLeaderUrl("collection1", "shard3", 15000));
} finally {
- System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
System.clearProperty("bootstrap_confdir");
if (DEBUG) {
if (zkController != null) {
zkClient.printLayoutToStdOut();
}
}
- if (zkClient != null) {
- zkClient.close();
- }
+ close(zkClient);
if (zkController != null) {
zkController.close();
}
@@ -266,6 +258,7 @@ public class OverseerTest extends SolrTe
ZkTestServer server = new ZkTestServer(zkDir);
+ System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.toString(sliceCount));
SolrZkClient zkClient = null;
ZkStateReader reader = null;
final ZkController[] controllers = new ZkController[nodeCount];
@@ -281,8 +274,6 @@ public class OverseerTest extends SolrTe
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.valueOf(sliceCount).toString());
-
for (int i = 0; i < nodeCount; i++) {
controllers[i] = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
@@ -313,6 +304,7 @@ public class OverseerTest extends SolrTe
public void run() {
final CloudDescriptor collection1Desc = new CloudDescriptor();
collection1Desc.setCollectionName("collection1");
+ collection1Desc.setNumShards(sliceCount);
final String coreName = "core" + slot;
@@ -402,12 +394,8 @@ public class OverseerTest extends SolrTe
zkClient.printLayoutToStdOut();
}
}
- if (zkClient != null) {
- zkClient.close();
- }
- if (reader != null) {
- reader.close();
- }
+ close(zkClient);
+ close(reader);
for (int i = 0; i < controllers.length; i++)
if (controllers[i] != null) {
controllers[i].close();
@@ -457,36 +445,23 @@ public class OverseerTest extends SolrTe
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient.makePath("/live_nodes", true);
- //live node
- String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
- zkClient.makePath(nodePath,CreateMode.EPHEMERAL, true);
-
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- Overseer.createClientNodes(zkClient, "node1");
-
overseerClient = electNewOverseer(server.getZkAddress());
- HashMap<String, String> coreProps = new HashMap<String,String>();
- coreProps.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr");
- coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
- coreProps.put(ZkStateReader.CORE_NAME_PROP, "core1");
- coreProps.put(ZkStateReader.ROLES_PROP, "");
- coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
- CoreState state = new CoreState("core1", "collection1", coreProps, 2);
+ DistributedQueue q = Overseer.getInQueue(zkClient);
- nodePath = "/node_states/node1";
-
- try {
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
- } catch (KeeperException ke) {
- if(ke.code()!=Code.NODEEXISTS) {
- throw ke;
- }
- }
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.COLLECTION_PROP, "collection1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+
+ q.offer(ZkStateReader.toJSON(m));
- zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
waitForCollections(reader, "collection1");
assertEquals(reader.getCloudState().toString(), ZkStateReader.RECOVERING,
@@ -494,27 +469,24 @@ public class OverseerTest extends SolrTe
.get("node1_core1").get(ZkStateReader.STATE_PROP));
//publish node state (active)
- coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-
- coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
- state = new CoreState("core1", "collection1", coreProps, 2);
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.COLLECTION_PROP, "collection1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
- zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
+ q.offer(ZkStateReader.toJSON(m));
verifyStatus(reader, ZkStateReader.ACTIVE);
} finally {
- if (zkClient != null) {
- zkClient.close();
- }
- if (overseerClient != null) {
- overseerClient.close();
- }
+ close(zkClient);
+ close(overseerClient);
- if (reader != null) {
- reader.close();
- }
+ close(reader);
server.shutdown();
}
}
@@ -608,24 +580,16 @@ public class OverseerTest extends SolrTe
version = getCloudStateVersion(controllerClient);
mockController.publishState("core1", null,1);
while(version == getCloudStateVersion(controllerClient));
- Thread.sleep(100);
+ Thread.sleep(500);
assertEquals("Shard count does not match", 0, reader.getCloudState()
.getSlice("collection1", "shard1").getShards().size());
} finally {
- if (mockController != null) {
- mockController.close();
- }
+ close(mockController);
- if (overseerClient != null) {
- overseerClient.close();
- }
- if (controllerClient != null) {
- controllerClient.close();
- }
- if (reader != null) {
- reader.close();
- }
+ close(overseerClient);
+ close(controllerClient);
+ close(reader);
server.shutdown();
}
}
@@ -719,18 +683,10 @@ public class OverseerTest extends SolrTe
killerThread.join();
}
}
- if (mockController != null) {
- mockController.close();
- }
- if (mockController2 != null) {
- mockController2.close();
- }
- if (controllerClient != null) {
- controllerClient.close();
- }
- if (reader != null) {
- reader.close();
- }
+ close(mockController);
+ close(mockController2);
+ close(controllerClient);
+ close(reader);
server.shutdown();
}
}
@@ -791,19 +747,10 @@ public class OverseerTest extends SolrTe
assertEquals("Shard was found in more than 1 times in CloudState", 1,
numFound);
} finally {
- if (overseerClient != null) {
- overseerClient.close();
- }
- if (mockController != null) {
- mockController.close();
- }
-
- if (controllerClient != null) {
- controllerClient.close();
- }
- if (reader != null) {
- reader.close();
- }
+ close(overseerClient);
+ close(mockController);
+ close(controllerClient);
+ close(reader);
server.shutdown();
}
}
@@ -842,23 +789,101 @@ public class OverseerTest extends SolrTe
assertEquals("Slicecount does not match", 12, reader.getCloudState().getSlices("collection1").size());
} finally {
- if (overseerClient != null) {
- overseerClient.close();
- }
- if (mockController != null) {
- mockController.close();
- }
+ close(overseerClient);
+ close(mockController);
+ close(controllerClient);
+ close(reader);
+ server.shutdown();
+ }
+ }
- if (controllerClient != null) {
- controllerClient.close();
- }
- if (reader != null) {
- reader.close();
+ private void close(MockZKController mockController) {
+ if (mockController != null) {
+ mockController.close();
+ }
+ }
+
+
+ @Test
+ public void testReplay() throws Exception{
+ String zkDir = dataDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+ ZkTestServer server = new ZkTestServer(zkDir);
+ SolrZkClient zkClient = null;
+ SolrZkClient overseerClient = null;
+ ZkStateReader reader = null;
+
+ try {
+ server.run();
+ zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+ zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
+
+ reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+ //prepopulate work queue with some items to emulate previous overseer died before persisting state
+ DistributedQueue queue = Overseer.getInternalQueue(zkClient);
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.SHARD_ID_PROP, "s1",
+ ZkStateReader.COLLECTION_PROP, "collection1",
+ ZkStateReader.CORE_NAME_PROP, "core1",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ queue.offer(ZkStateReader.toJSON(m));
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.SHARD_ID_PROP, "s1",
+ ZkStateReader.COLLECTION_PROP, "collection1",
+ ZkStateReader.CORE_NAME_PROP, "core2",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ queue.offer(ZkStateReader.toJSON(m));
+
+ overseerClient = electNewOverseer(server.getZkAddress());
+
+ //submit to proper queue
+ queue = Overseer.getInQueue(zkClient);
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+ ZkStateReader.NODE_NAME_PROP, "node1",
+ ZkStateReader.SHARD_ID_PROP, "s1",
+ ZkStateReader.COLLECTION_PROP, "collection1",
+ ZkStateReader.CORE_NAME_PROP, "core3",
+ ZkStateReader.ROLES_PROP, "",
+ ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ queue.offer(ZkStateReader.toJSON(m));
+
+ for(int i=0;i<100;i++) {
+ Slice s = reader.getCloudState().getSlice("collection1", "s1");
+ if(s!=null && s.getShards().size()==3) break;
+ Thread.sleep(100);
}
+ assertNotNull(reader.getCloudState().getSlice("collection1", "s1"));
+ assertEquals(3, reader.getCloudState().getSlice("collection1", "s1").getShards().size());
+ } finally {
+ close(overseerClient);
+ close(zkClient);
+ close(reader);
server.shutdown();
}
}
+ private void close(ZkStateReader reader) {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ private void close(SolrZkClient overseerClient) throws InterruptedException {
+ if (overseerClient != null) {
+ overseerClient.close();
+ }
+ }
+
private int getCloudStateVersion(SolrZkClient controllerClient)
throws KeeperException, InterruptedException {
return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion();
@@ -870,9 +895,10 @@ public class OverseerTest extends SolrTe
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
ZkStateReader reader = new ZkStateReader(zkClient);
LeaderElector overseerElector = new LeaderElector(zkClient);
- ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader);
+ ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), reader);
overseerElector.setup(ec);
overseerElector.joinElection(ec);
return zkClient;
}
+
}
\ No newline at end of file
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Jun 8 05:21:28 2012
@@ -89,6 +89,19 @@ public class CloudState implements JSONW
if (collectionLeaders == null) return null;
return collectionLeaders.get(shard);
}
+
+ /**
+ * Get shard properties or null if shard is not found.
+ */
+ public ZkNodeProps getShardProps(final String collection, final String coreNodeName) {
+ Map<String, Slice> slices = getSlices(collection);
+ for(Slice slice: slices.values()) {
+ if(slice.getShards().get(coreNodeName)!=null) {
+ return slice.getShards().get(coreNodeName);
+ }
+ }
+ return null;
+ }
private void addRangeInfos(Set<String> collections) {
for (String collection : collections) {