You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/16 18:07:45 UTC
svn commit: r1215227 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Fri Dec 16 17:07:44 2011
New Revision: 1215227
URL: http://svn.apache.org/viewvc?rev=1215227&view=rev
Log:
commit sami's latest overseer patch
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Dec 16 17:07:44 2011
@@ -130,7 +130,7 @@ public class LeaderElector {
}
protected void runIamLeaderProcess(final ElectionContext context) throws KeeperException,
- InterruptedException, IOException {
+ InterruptedException {
String currentLeaderZkPath = context.electionPath
+ LEADER_NODE;
// TODO: leader election tests do not currently set the props
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java Fri Dec 16 17:07:44 2011
@@ -40,8 +40,8 @@ public class NodeStateWatcher implements
private static Logger log = LoggerFactory.getLogger(NodeStateWatcher.class);
public static interface NodeStateChangeListener {
- void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException;
- void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException;
+ void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
+ void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
}
private final SolrZkClient zkClient;
@@ -120,6 +120,9 @@ public class NodeStateWatcher implements
listener.coreCreated(nodeName, Collections.unmodifiableSet(newCores));
} catch (KeeperException e) {
log.warn("Could not talk to ZK", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Could not talk to ZK", e);
}
}
@@ -128,6 +131,9 @@ public class NodeStateWatcher implements
listener.coreChanged(nodeName, Collections.unmodifiableSet(changedCores));
} catch (KeeperException e) {
log.warn("Could not talk to ZK", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Could not talk to ZK", e);
}
}
} else {
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Dec 16 17:07:44 2011
@@ -32,11 +32,9 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.cloud.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +43,8 @@ import org.slf4j.LoggerFactory;
*/
public class Overseer implements NodeStateChangeListener {
+ public static final String ASSIGNMENTS_NODE = "/node_assignments";
+ public static final String STATES_NODE = "/node_states";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private final SolrZkClient zkClient;
@@ -57,23 +57,9 @@ public class Overseer implements NodeSta
log.info("Constructing new Overseer");
this.zkClient = zkClient;
this.reader = reader;
- createZkNodes(zkClient);
createWatches();
}
- public static void createZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
- //create assignments node if it does not exist
- if (!zkClient.exists("/node_assignments")) {
- try {
- zkClient.makePath("/node_assignments", CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
- }
- }
- }
- }
-
public synchronized void createWatches()
throws KeeperException, InterruptedException {
// We need to fetch the current cluster state and the set of live nodes
@@ -148,7 +134,7 @@ public class Overseer implements NodeSta
private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
for (String nodeName : nodeNames) {
- String path = "/node_states/" + nodeName;
+ final String path = STATES_NODE + "/" + nodeName;
synchronized (nodeStateWatches) {
if (!nodeStateWatches.containsKey(nodeName)) {
try {
@@ -175,12 +161,14 @@ public class Overseer implements NodeSta
/**
* Try to assign core to the cluster
* @throws KeeperException
+ * @throws InterruptedException
*/
- private void updateState(String nodeName, CoreState coreState) throws KeeperException {
+ private void updateState(String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
String collection = coreState.getCollectionName();
String coreName = coreState.getCoreName();
synchronized (reader.getUpdateLock()) {
+ reader.updateCloudState(true); //get fresh copy of the state
String shardId;
CloudState state = reader.getCloudState();
if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
@@ -221,7 +209,7 @@ public class Overseer implements NodeSta
}
@Override
- public void coreCreated(String nodeName, Set<CoreState> states) throws KeeperException {
+ public void coreCreated(String nodeName, Set<CoreState> states) throws KeeperException, InterruptedException {
log.debug("Cores created: " + nodeName + " states:" +states);
for (CoreState state : states) {
updateState(nodeName, state);
@@ -241,7 +229,6 @@ public class Overseer implements NodeSta
ArrayList<CoreAssignment> assignments = new ArrayList<CoreAssignment>();
for(CoreState coreState: states) {
final String coreName = coreState.getCoreName();
- final String collection = coreState.getCollectionName();
HashMap<String, String> coreProperties = new HashMap<String, String>();
Map<String, Slice> slices = cloudState.getSlices(coreState.getCollectionName());
for(Entry<String, Slice> entry: slices.entrySet()) {
@@ -250,22 +237,13 @@ public class Overseer implements NodeSta
coreProperties.put(ZkStateReader.SHARD_ID_PROP, entry.getKey());
}
}
- CoreAssignment assignment = new CoreAssignment(coreName, collection, coreProperties);
+ CoreAssignment assignment = new CoreAssignment(coreName, coreProperties);
assignments.add(assignment);
}
//serialize
byte[] content = ZkStateReader.toJSON(assignments);
- final String nodeName = "/node_assignments/" + node;
- if (!zkClient.exists(nodeName)) {
- try {
- zkClient.makePath(nodeName);
- } catch (KeeperException ke) {
- if (ke.code() != Code.NODEEXISTS) {
- throw ke;
- }
- }
- }
+ final String nodeName = ASSIGNMENTS_NODE + "/" + node;
zkClient.setData(nodeName, content);
}
@@ -278,10 +256,31 @@ public class Overseer implements NodeSta
}
@Override
- public void coreChanged(String nodeName, Set<CoreState> states) throws KeeperException {
+ public void coreChanged(String nodeName, Set<CoreState> states) throws KeeperException, InterruptedException {
log.debug("Cores changed: " + nodeName + " states:" + states);
for (CoreState state : states) {
updateState(nodeName, state);
}
}
+
+ public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+ createZkNode(zkClient, STATES_NODE + "/" + nodeName);
+ createZkNode(zkClient, ASSIGNMENTS_NODE + "/" + nodeName);
+ }
+
+ private static void createZkNode(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+
+ if (log.isInfoEnabled()) {
+ log.info("creating node:" + nodeName);
+ }
+
+ try {
+ if (!zkClient.exists(nodeName)) {
+ zkClient.makePath(nodeName, CreateMode.PERSISTENT, null);
+ }
+
+ } catch (NodeExistsException e) {
+ // it's ok
+ }
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java Fri Dec 16 17:07:44 2011
@@ -17,13 +17,9 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Overseer Elector.
@@ -31,7 +27,6 @@ import org.slf4j.LoggerFactory;
public class OverseerElector extends LeaderElector {
private final SolrZkClient client;
private final ZkStateReader reader;
- private static Logger log = LoggerFactory.getLogger(OverseerElector.class);
public OverseerElector(SolrZkClient client, ZkStateReader stateReader) {
super(client);
@@ -40,21 +35,8 @@ public class OverseerElector extends Lea
}
@Override
- protected void runIamLeaderProcess(ElectionContext context) {
- try {
- new Overseer(client, reader);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("Cannot run overseer leader process, Solr cannot talk to ZK");
- return;
- }
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Could not run leader process", e);
- }
+ protected void runIamLeaderProcess(ElectionContext context) throws KeeperException, InterruptedException{
+ new Overseer(client, reader);
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Dec 16 17:07:44 2011
@@ -302,9 +302,9 @@ public final class ZkController {
}
}
+ Overseer.createClientNodes(zkClient, getNodeName());
createEphemeralLiveNode();
setUpCollectionsNode();
- createAssignmentsNode();
byte[] assignments = zkClient.getData(getAssignmentsNode(), new Watcher(){
@@ -359,9 +359,12 @@ public final class ZkController {
}
-
private String getAssignmentsNode() {
- return "/node_assignments/" + getNodeName();
+ return Overseer.ASSIGNMENTS_NODE + "/" + getNodeName();
+ }
+
+ private String getStatesNode() {
+ return Overseer.STATES_NODE + "/" + getNodeName();
}
private void createEphemeralLiveNode() throws KeeperException,
@@ -476,17 +479,17 @@ public final class ZkController {
props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
props.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
if(shardId!=null) {
- props.put("shard_id", shardId);
+ props.put(ZkStateReader.SHARD_ID_PROP, shardId);
}
if (shardId == null && getShardId(desc, state, shardZkNodeName)) {
publishState(cloudDesc, shardZkNodeName, props); //need to publish state to get overseer assigned id
shardId = doGetShardIdProcess(coreName, cloudDesc);
cloudDesc.setShardId(shardId);
- props.put("shard_id", shardId);
+ props.put(ZkStateReader.SHARD_ID_PROP, shardId);
} else {
// shard id was picked up in getShardId
- props.put("shard_id", cloudDesc.getShardId());
+ props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
shardId = cloudDesc.getShardId();
publishState(cloudDesc, shardZkNodeName, props);
}
@@ -494,7 +497,7 @@ public final class ZkController {
if (log.isInfoEnabled()) {
log.info("Register shard - core:" + coreName + " address:"
+ shardUrl + "shardId:" + shardId);
- }
+ }
ZkNodeProps zkProps = new ZkNodeProps(props);
@@ -668,33 +671,6 @@ public final class ZkController {
}
}
-
- private void createAssignmentsNode() throws KeeperException, InterruptedException {
- String nodeName = getAssignmentsNode();
-
- try {
-
- if (log.isInfoEnabled()) {
- log.info("creating node:" + nodeName);
- }
-
- zkClient.makePath(nodeName, CreateMode.PERSISTENT, null);
-
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
String collection = cd.getCollectionName();
@@ -809,12 +785,6 @@ public final class ZkController {
final String nodePath = "/node_states/" + getNodeName();
try {
-
- if (!zkClient.exists(nodePath)) {
- // nocommit: race condition - someone else might make the node first
- zkClient.makePath(nodePath);
- }
-
log.info("publishing node state:" + coreStates.values());
zkClient.setData(
nodePath,
@@ -833,9 +803,9 @@ public final class ZkController {
}
}
- private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor) {
+ private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor) throws InterruptedException {
final String shardZkNodeName = getNodeName() + "_" + coreName;
- int retryCount = 20;
+ int retryCount = 40;
while (retryCount-->0) {
synchronized (assignments) {
CoreAssignment assignment = assignments.get(shardZkNodeName);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Dec 16 17:07:44 2011
@@ -18,17 +18,23 @@ package org.apache.solr.cloud;
*/
import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
@@ -131,26 +137,177 @@ public class OverseerTest extends SolrTe
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
}
-
+
+ @Test
+ public void testShardAssignmentBigger() throws Exception {
+ String zkDir = dataDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+
+ final int nodeCount = 10; //how many simulated nodes
+ final int coreCount = 66; //how many cores to register
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+ ZkStateReader reader = null;
+ final ZkController[] controllers = new ZkController[nodeCount];
+
+ try {
+ server.run();
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+ reader = new ZkStateReader(zkClient);
+
+ System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
+
+ for (int i = 0; i < nodeCount; i++) {
+
+ controllers[i] = new ZkController(server.getZkAddress(), TIMEOUT, 10000,
+ "localhost", "898" + i, "solr", new CurrentCoreDescriptorProvider() {
+
+ @Override
+ public List<CoreDescriptor> getCurrentDescriptors() {
+ // do nothing
+ return null;
+ }
+ });
+ }
+
+ System.setProperty("bootstrap_confdir", getFile("solr/conf")
+ .getAbsolutePath());
+
+
+ final ExecutorService[] nodeExecutors = new ExecutorService[nodeCount];
+ for (int i = 0; i < nodeCount; i++) {
+ nodeExecutors[i] = Executors.newFixedThreadPool(1);
+ }
+
+ final String[] ids = new String[coreCount];
+ //register total of coreCount cores
+ for (int i = 0; i < coreCount; i++) {
+ final int slot = i;
+ Runnable coreStarter = new Runnable() {
+ @Override
+ public void run() {
+ // TODO Auto-generated method stub
+ CloudDescriptor collection1Desc = new CloudDescriptor();
+ collection1Desc.setCollectionName("collection1");
+
+ final String coreName = "core" + slot;
+
+ CoreDescriptor desc = new CoreDescriptor(null, coreName, "");
+ desc.setCloudDescriptor(collection1Desc);
+ try {
+ ids[slot] = controllers[slot % nodeCount].register(coreName, desc);
+ } catch (Exception e) {
+ fail("register threw exception:" + e);
+ }
+ }
+ };
+
+ nodeExecutors[i % nodeCount].submit(coreStarter);
+ }
+
+ for (int i = 0; i < nodeCount; i++) {
+ nodeExecutors[i].shutdown();
+ }
+
+ for (int i = 0; i < nodeCount; i++) {
+ while (!nodeExecutors[i].awaitTermination(100, TimeUnit.MILLISECONDS));
+ }
+
+ // make sure all cores have been assigned a id in cloudstate
+ for (int i = 0; i < 40; i++) {
+ reader.updateCloudState(true);
+ CloudState state = reader.getCloudState();
+ Map<String,Slice> slices = state.getSlices("collection1");
+ int count = 0;
+ for (String name : slices.keySet()) {
+ count += slices.get(name).getShards().size();
+ }
+ if (coreCount == count) break;
+ Thread.sleep(200);
+ }
+
+ // make sure all cores have been returned a id
+ for (int i = 0; i < 40; i++) {
+ int assignedCount = 0;
+ for (int j = 0; j < coreCount; j++) {
+ if (ids[j] != null) {
+ assignedCount++;
+ }
+ }
+ if (coreCount == assignedCount) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+
+ final HashMap<String, AtomicInteger> counters = new HashMap<String,AtomicInteger>();
+ for (int i = 1; i < 4; i++) {
+ counters.put("shard" + i, new AtomicInteger());
+ }
+
+ for (int i = 0; i < coreCount; i++) {
+ final AtomicInteger ai = counters.get(ids[i]);
+ assertNotNull("could not find counter for shard:" + ids[i], ai);
+ ai.incrementAndGet();
+ }
+
+ for (String counter: counters.keySet()) {
+ int count = counters.get(counter).intValue();
+ int expectedCount = coreCount / 3;
+ if (count != expectedCount) {
+ fail("unevenly assigned shard ids, " + counter + " had " + count
+ + ", expected " + expectedCount + " (+-1)");
+ }
+ }
+
+ } finally {
+ if (DEBUG) {
+ if (controllers[0] != null) {
+ controllers[0].printLayoutToStdOut();
+ }
+ }
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ for (int i = 0; i < controllers.length; i++)
+ if (controllers[i] != null) {
+ controllers[i].close();
+ }
+ server.shutdown();
+ }
+
+ System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
+ }
+
//wait until i slices for collection have appeared
- private void waitForSliceCount(ZkStateReader stateReader, String collection, int i) throws InterruptedException {
+ private void waitForSliceCount(ZkStateReader stateReader, String collection, int i) throws InterruptedException, KeeperException {
waitForCollections(stateReader, collection);
- int maxIterations = 400;
+ int maxIterations = 200;
while (0 < maxIterations--) {
CloudState state = stateReader.getCloudState();
Map<String,Slice> sliceMap = state.getSlices(collection);
if (sliceMap != null && sliceMap.keySet().size() == i) {
return;
}
- Thread.sleep(50);
+ Thread.sleep(100);
}
}
//wait until collections are available
- private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException {
+ private void waitForCollections(ZkStateReader stateReader, String... collections) throws InterruptedException, KeeperException {
int maxIterations = 100;
while (0 < maxIterations--) {
- Set<String> availableCollections = stateReader.getCloudState().getCollections();
+ stateReader.updateCloudState(true);
+ final CloudState state = stateReader.getCloudState();
+ Set<String> availableCollections = state.getCollections();
int availableCount = 0;
for(String requiredCollection: collections) {
if(availableCollections.contains(requiredCollection)) {
@@ -160,6 +317,7 @@ public class OverseerTest extends SolrTe
Thread.sleep(50);
}
}
+ log.warn("Timeout waiting for collections: " + Arrays.asList(collections) + " state:" + stateReader.getCloudState());
}
@Test
@@ -171,6 +329,7 @@ public class OverseerTest extends SolrTe
SolrZkClient zkClient = null;
ZkStateReader reader = null;
+ SolrZkClient overseerClient = null;
try {
server.run();
@@ -189,15 +348,12 @@ public class OverseerTest extends SolrTe
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- OverseerElector elector1 = new OverseerElector(zkClient, reader);
+ Overseer.createClientNodes(zkClient, "node1");
ElectionContext ec = new OverseerElectionContext("node1");
- elector1.setup(ec);
- elector1.joinElection(ec);
-
- Thread.sleep(1000);
-
+ overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+
HashMap<String, String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.URL_PROP, "http://127.0.0.1/solr");
coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
@@ -230,10 +386,8 @@ public class OverseerTest extends SolrTe
state = new CoreState("core1", "collection1", coreProps);
zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}));
-
- Thread.sleep(2000); // wait for data to update
-
- assertEquals("Illegal state", ZkStateReader.ACTIVE, reader.getCloudState().getSlice("collection1", "shard1").getShards().get("core1").get(ZkStateReader.STATE_PROP));
+
+ verifyStatus(reader, ZkStateReader.ACTIVE);
} finally {
System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
@@ -241,13 +395,28 @@ public class OverseerTest extends SolrTe
if (zkClient != null) {
zkClient.close();
}
+ if (overseerClient != null) {
+ overseerClient.close();
+ }
+
if (reader != null) {
reader.close();
}
server.shutdown();
}
-
+ }
+ private void verifyStatus(ZkStateReader reader, String expectedState) throws InterruptedException {
+ int maxIterations = 100;
+ String coreState = null;
+ while(maxIterations-->0) {
+ coreState = reader.getCloudState().getSlice("collection1", "shard1").getShards().get("core1").get(ZkStateReader.STATE_PROP);
+ if(coreState.equals(expectedState)) {
+ return;
+ }
+ Thread.sleep(50);
+ }
+ fail("Illegal state, was:" + coreState + " expected:" + expectedState + "cloudState:" + reader.getCloudState());
}
@Test
@@ -257,100 +426,76 @@ public class OverseerTest extends SolrTe
ZkTestServer server = new ZkTestServer(zkDir);
- SolrZkClient zkClient = null;
- SolrZkClient zkClient2 = null;
+ SolrZkClient controllerClient = null;
+ SolrZkClient overseerClient = null;
ZkStateReader reader = null;
try {
server.run();
- zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
- zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+ controllerClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
- zkClient.makePath("/live_nodes");
+ controllerClient.makePath("/live_nodes");
- // create collections
- Map<String,String> props = new HashMap<String,String>();
- props.put(ZkStateReader.NUM_SHARDS_PROP, "2");
- ZkNodeProps zkProps = new ZkNodeProps(props);
- zkClient.makePath("/collections/collection1",
- ZkStateReader.toJSON(zkProps));
-
- reader = new ZkStateReader(zkClient2);
+ reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
- OverseerElector elector1 = new OverseerElector(zkClient, reader);
+ Overseer.createClientNodes(controllerClient, "node1");
+
ElectionContext ec = new OverseerElectionContext("node1");
- elector1.setup(ec);
- elector1.joinElection(ec);
-
- Thread.sleep(50);
-
-
- OverseerElector elector2 = new OverseerElector(zkClient2, reader);
- elector2.setup(ec);
- elector2.joinElection(ec);
+ overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
// live node
- String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
- zkClient2.makePath(nodePath, CreateMode.EPHEMERAL);
-
+ final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
+ controllerClient.makePath(nodePath, CreateMode.EPHEMERAL);
HashMap<String,String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
CoreState state = new CoreState("core1", "collection1", coreProps);
- nodePath = "/node_states/node1";
-
- try {
- zkClient2.makePath(nodePath, CreateMode.EPHEMERAL);
- } catch (KeeperException ke) {
- if (ke.code() != Code.NODEEXISTS) {
- throw ke;
- }
- }
+ final String statePath = Overseer.STATES_NODE + "/node1";
// publish node state (recovering)
- zkClient2.setData(nodePath, ZkStateReader.toJSON(new CoreState[] {state}));
+ controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}));
// wait overseer assignment
waitForSliceCount(reader, "collection1", 1);
-
- assertEquals("Illegal state", ZkStateReader.RECOVERING,
- reader.getCloudState().getSlice("collection1", "shard1").getShards()
- .get("core1").get(ZkStateReader.STATE_PROP));
-
- //zkClient2.printLayoutToStdOut();
- // close overseer client (kills current overseer)
- zkClient.close();
- zkClient = null;
-
+
+ verifyStatus(reader, ZkStateReader.RECOVERING);
+
// publish node state (active)
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
state = new CoreState("core1", "collection1", coreProps);
+ controllerClient.setData(statePath,
+ ZkStateReader.toJSON(new CoreState[] {state}));
+
+ verifyStatus(reader, ZkStateReader.ACTIVE);
+ overseerClient.close();
- zkClient2
- .setData(nodePath, ZkStateReader.toJSON(new CoreState[] {state}));
-
- // nocommit - we should do short waits and poll
- Thread.sleep(1000); // wait for data to update
-
- // zkClient2.printLayoutToStdOut();
-
- assertEquals("Illegal state", ZkStateReader.ACTIVE,
- reader.getCloudState().getSlice("collection1", "shard1").getShards()
- .get("core1").get(ZkStateReader.STATE_PROP));
-
+ coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+ state = new CoreState("core1", "collection1", coreProps);
+
+ controllerClient.setData(statePath,
+ ZkStateReader.toJSON(new CoreState[] {state}));
+
+ overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+
+ verifyStatus(reader, ZkStateReader.RECOVERING);
+
+ assertEquals("Live nodes count does not match", 1, reader.getCloudState()
+ .getLiveNodes().size());
+ assertEquals("Shard count does not match", 1, reader.getCloudState()
+ .getSlice("collection1", "shard1").getShards().size());
} finally {
- if (zkClient != null) {
- zkClient.close();
+ if (overseerClient != null) {
+ overseerClient.close();
}
- if (zkClient2 != null) {
- zkClient2.close();
+ if (controllerClient != null) {
+ controllerClient.close();
}
if (reader != null) {
reader.close();
@@ -358,4 +503,16 @@ public class OverseerTest extends SolrTe
server.shutdown();
}
}
+
+ private SolrZkClient electNewOverseer(String address,
+ ZkStateReader reader, ElectionContext ec) throws InterruptedException,
+ TimeoutException, IOException, KeeperException {
+ SolrZkClient overseerClient;
+ OverseerElector overseerElector;
+ overseerClient = new SolrZkClient(address, TIMEOUT);
+ overseerElector = new OverseerElector(overseerClient, reader);
+ overseerElector.setup(ec);
+ overseerElector.joinElection(ec);
+ return overseerClient;
+ }
}
\ No newline at end of file
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java?rev=1215227&r1=1215226&r2=1215227&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreAssignment.java Fri Dec 16 17:07:44 2011
@@ -36,7 +36,7 @@ public class CoreAssignment implements J
this.properties = Collections.unmodifiableMap(props);
}
- public CoreAssignment(String coreName, String collectionName, Map<String,String> properties) {
+ public CoreAssignment(String coreName, Map<String,String> properties) {
HashMap<String,String> props = new HashMap<String,String>();
props.putAll(properties);
props.put(CORE, coreName);
@@ -66,12 +66,16 @@ public class CoreAssignment implements J
@Override
public int hashCode() {
- return getCoreName().hashCode();
+ return properties.hashCode();
}
@Override
- public boolean equals(Object obj) {
- return hashCode() == obj.hashCode();
+ public boolean equals(Object other) {
+ if(other instanceof CoreAssignment) {
+ CoreAssignment otherAssignment = (CoreAssignment) other;
+ return this.getProperties().equals(otherAssignment.getProperties());
+ }
+ return false;
}
@Override