You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/23 15:26:37 UTC
svn commit: r1222688 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Fri Dec 23 14:26:37 2011
New Revision: 1222688
URL: http://svn.apache.org/viewvc?rev=1222688&view=rev
Log:
apply sami's latest patch
Added:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (with props)
Removed:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/OverseerElector.java
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Dec 23 14:26:37 2011
@@ -1,5 +1,11 @@
package org.apache.solr.cloud;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -20,32 +26,54 @@ package org.apache.solr.cloud;
public abstract class ElectionContext {
final String electionPath;
- final byte[] leaderProps;
+ final ZkNodeProps leaderProps;
final String id;
+ final String leaderPath;
public ElectionContext(final String shardZkNodeName,
- final String electionPath, final byte[] leaderProps) {
+ final String electionPath, final String leaderPath, final ZkNodeProps leaderProps) {
this.id = shardZkNodeName;
this.electionPath = electionPath;
+ this.leaderPath = leaderPath;
this.leaderProps = leaderProps;
}
+ abstract void runLeaderProcess() throws KeeperException, InterruptedException;
}
final class ShardLeaderElectionContext extends ElectionContext {
- public ShardLeaderElectionContext(final String shardid,
- final String collection, final String shardZkNodeName, final byte[] props) {
- super(shardZkNodeName, "/collections/" + collection + "/leader_elect/"
- + shardid, props);
+ private final SolrZkClient zkClient;
+ public ShardLeaderElectionContext(final String shardId,
+ final String collection, final String shardZkNodeName, ZkNodeProps props, SolrZkClient zkClient) {
+ super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+ + shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
+ props);
+ this.zkClient = zkClient;
+ }
+
+ @Override
+ void runLeaderProcess() throws KeeperException, InterruptedException {
+ String currentLeaderZkPath = leaderPath;
+ zkClient.makePath(currentLeaderZkPath, leaderProps == null ? null
+ : ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL);
}
-
}
final class OverseerElectionContext extends ElectionContext {
- public OverseerElectionContext(final String zkNodeName) {
- super(zkNodeName, "/overseer_elect", null);
+ private final SolrZkClient zkClient;
+ private final ZkStateReader stateReader;
+
+ public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
+ super(zkNodeName, "/overseer_elect", null, null);
+ this.zkClient = zkClient;
+ this.stateReader = stateReader;
+ }
+
+ @Override
+ void runLeaderProcess() throws KeeperException, InterruptedException {
+ new Overseer(zkClient, stateReader);
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java Fri Dec 23 14:26:37 2011
@@ -40,8 +40,7 @@ public class NodeStateWatcher implements
private static Logger log = LoggerFactory.getLogger(NodeStateWatcher.class);
public static interface NodeStateChangeListener {
- void coreCreated(String shardZkNodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
- void coreChanged(String nodeName, Set<CoreState> cores) throws KeeperException, InterruptedException;
+ void coreChanged(String nodeName, Set<CoreState> changedCores, Set<CoreState> allCores) throws KeeperException, InterruptedException;
}
private final SolrZkClient zkClient;
@@ -57,11 +56,12 @@ public class NodeStateWatcher implements
}
public NodeStateWatcher(SolrZkClient zkClient, String nodeName, String path,
- NodeStateChangeListener listener) {
+ NodeStateChangeListener listener) throws KeeperException, InterruptedException {
this.nodeName = nodeName;
this.zkClient = zkClient;
this.path = path;
this.listener = listener;
+ processStateChange();
}
public void close() {
@@ -73,8 +73,7 @@ public class NodeStateWatcher implements
if (stop)
return;
try {
- byte[] data = zkClient.getData(path, this, null);
- processStateChange(data);
+ processStateChange();
} catch (KeeperException e) {
// nocommit: stop working on any keeper error
log.warn("Could not talk to ZK", e);
@@ -86,13 +85,15 @@ public class NodeStateWatcher implements
}
}
- void processStateChange(byte[] data) {
+ private void processStateChange() throws KeeperException, InterruptedException {
+ byte[] data = zkClient.getData(path, this, null);
+
if (data != null) {
CoreState[] states = CoreState.load(data);
List<CoreState> stateList = Arrays.asList(states);
- HashSet<CoreState> newCores = new HashSet<CoreState>();
- newCores.addAll(stateList);
- newCores.removeAll(currentState);
+ HashSet<CoreState> modifiedCores = new HashSet<CoreState>();
+ modifiedCores.addAll(stateList);
+ modifiedCores.removeAll(currentState);
HashSet<CoreState> newState = new HashSet<CoreState>();
newState.addAll(stateList);
@@ -102,22 +103,20 @@ public class NodeStateWatcher implements
lookup.put(state.getCoreName(), state);
}
- HashSet<CoreState> changedCores = new HashSet<CoreState>();
-
//check for status change
for(CoreState state: currentState) {
if(lookup.containsKey(state.getCoreName())) {
if(!state.getProperties().equals(lookup.get(state.getCoreName()).getProperties())) {
- changedCores.add(lookup.get(state.getCoreName()));
+ modifiedCores.add(lookup.get(state.getCoreName()));
}
}
}
currentState = Collections.unmodifiableSet(newState);
- if (newCores.size() > 0) {
+ if (modifiedCores.size() > 0) {
try {
- listener.coreCreated(nodeName, Collections.unmodifiableSet(newCores));
+ listener.coreChanged(nodeName, Collections.unmodifiableSet(modifiedCores), currentState);
} catch (KeeperException e) {
log.warn("Could not talk to ZK", e);
} catch (InterruptedException e) {
@@ -126,16 +125,6 @@ public class NodeStateWatcher implements
}
}
- if (changedCores.size() > 0) {
- try {
- listener.coreChanged(nodeName, Collections.unmodifiableSet(changedCores));
- } catch (KeeperException e) {
- log.warn("Could not talk to ZK", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Could not talk to ZK", e);
- }
- }
} else {
// ignore null state
}
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java?rev=1222688&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java Fri Dec 23 14:26:37 2011
@@ -0,0 +1,89 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A watcher for shard leader.
+ */
+public class ShardLeaderWatcher implements Watcher {
+ private static Logger logger = LoggerFactory.getLogger(ShardLeaderWatcher.class);
+ static interface ShardLeaderListener {
+ void announceLeader(String collection, String shardId, ZkNodeProps props);
+ }
+
+ private final String shard;
+ private final String collection;
+ private final String path;
+ private final SolrZkClient zkClient;
+ private volatile boolean closed = false;
+ private final ShardLeaderListener listener;
+
+ public ShardLeaderWatcher(String shard, String collection,
+ SolrZkClient zkClient, ShardLeaderListener listener) throws KeeperException, InterruptedException {
+ this.shard = shard;
+ this.collection = collection;
+ this.path = ZkStateReader.getShardLeadersPath(collection, shard);
+ this.zkClient = zkClient;
+ this.listener = listener;
+ processLeaderChange();
+ }
+
+ private void processLeaderChange() throws KeeperException, InterruptedException {
+ if(closed) return;
+ try {
+ byte[] data = zkClient.getData(path, this, null);
+ if (data != null) {
+ final ZkNodeProps leaderProps = ZkNodeProps.load(data);
+ listener.announceLeader(collection, shard, leaderProps);
+ }
+ } catch (KeeperException ke) {
+ //check if we lost connection or the node was gone
+ if (ke.code() != Code.CONNECTIONLOSS && ke.code() != Code.SESSIONEXPIRED
+ && ke.code() != Code.NONODE) {
+ throw ke;
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ processLeaderChange();
+ } catch (KeeperException e) {
+ logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+ }
+ }
+
+ public void close() {
+ closed = true;
+ }
+
+}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Dec 23 14:26:37 2011
@@ -140,8 +140,8 @@ public class CloudStateUpdateTest extend
ZkNodeProps zkProps2 = new ZkNodeProps(props2);
SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
- zkClient.makePath("/collections/testcore", ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT);
- zkClient.makePath("/collections/testcore/shards", CreateMode.PERSISTENT);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore", ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore/shards", CreateMode.PERSISTENT);
zkClient.close();
CoreDescriptor dcore = new CoreDescriptor(container1, "testcore",
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java Fri Dec 23 14:26:37 2011
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
@@ -32,9 +31,11 @@ import javax.xml.parsers.ParserConfigura
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreContainer.Initializer;
import org.apache.solr.core.SolrConfig;
+import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -62,6 +63,8 @@ public class LeaderElectionIntegrationTe
private Map<String,Set<Integer>> shardPorts = new HashMap<String,Set<Integer>>();
private SolrZkClient zkClient;
+
+ private ZkStateReader reader;
@BeforeClass
public static void beforeClass() throws Exception {}
@@ -94,6 +97,9 @@ public class LeaderElectionIntegrationTe
zkClient = new SolrZkClient(zkServer.getZkAddress(),
AbstractZkTestCase.TIMEOUT);
+
+ reader = new ZkStateReader(zkClient);
+
log.info("####SETUP_END " + getName());
}
@@ -122,7 +128,6 @@ public class LeaderElectionIntegrationTe
public void testSimpleSliceLeaderElection() throws Exception {
//printLayout(zkServer.getZkAddress());
-
for (int i = 0; i < 4; i++) {
// who is the leader?
String leader = getLeader();
@@ -140,8 +145,14 @@ public class LeaderElectionIntegrationTe
//printLayout(zkServer.getZkAddress());
- // wait a sec for new leader to register
- Thread.sleep(2000);
+ // poll until leader change is visible
+ for (int j = 0; j < 30; j++) {
+ String currentLeader = getLeader();
+ if(!leader.equals(currentLeader)) {
+ break;
+ }
+ Thread.sleep(100);
+ }
leader = getLeader();
int newLeaderPort = getLeaderPort(leader);
@@ -165,10 +176,12 @@ public class LeaderElectionIntegrationTe
int leaderPort = getLeaderPort(leader);
containerMap.get(leaderPort).getZkController().getZkClient().getSolrZooKeeper().pauseCnxn(2000);
- Thread.sleep(4000);
-
- // first leader should not be leader anymore
- assertNotSame(leaderPort, getLeaderPort(getLeader()));
+ for (int i = 0; i < 30; i++) { // wait till leader is changed
+ if (leaderPort != getLeaderPort(getLeader())) {
+ break;
+ }
+ Thread.sleep(100);
+ }
if (VERBOSE) System.out.println("kill everyone");
// kill everyone but the first leader that should have reconnected by now
@@ -177,33 +190,36 @@ public class LeaderElectionIntegrationTe
entry.getValue().shutdown();
}
}
-
- Thread.sleep(1000);
-
+
+ for (int i = 0; i < 30; i++) { // wait till leader is changed
+ if (leaderPort == getLeaderPort(getLeader())) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
// the original leader should be leader again now - everyone else is down
assertEquals(leaderPort, getLeaderPort(getLeader()));
//printLayout(zkServer.getZkAddress());
//Thread.sleep(100000);
}
- private String getLeader() throws Exception {
+ private String getLeader() throws InterruptedException {
String leader = null;
int tries = 30;
- while (true) {
- List<String> leaderChildren = zkClient.getChildren(
- "/collections/collection1/leader_elect/shard1/leader", null);
- if (leaderChildren.size() > 0) {
- assertEquals("There should only be one leader", 1,
- leaderChildren.size());
- leader = leaderChildren.get(0);
- break;
- } else {
- if (tries-- == 0) {
- printLayout(zkServer.getZkAddress());
- fail("No registered leader was found");
+ while (tries-- > 0) {
+ ZkNodeProps props;
+ try {
+ reader.updateCloudState(true);
+ props = reader.getLeaderProps("collection1", "shard1");
+ leader = props.get(ZkStateReader.NODE_NAME_PROP);
+ if (leader != null) {
+ break;
}
- Thread.sleep(1000);
+ } catch (KeeperException e) {
+ // ignore
}
+ Thread.sleep(100);
}
return leader;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Fri Dec 23 14:26:37 2011
@@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrConfig;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -73,11 +76,14 @@ public class LeaderElectionTest extends
private int nodeNumber;
private int seq = -1;
private volatile boolean stop;
+ private volatile boolean electionDone = false;
+ private final ZkNodeProps props;
public ClientThread(int nodeNumber) throws Exception {
super("Thread-" + nodeNumber);
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
this.nodeNumber = nodeNumber;
+ props = new ZkNodeProps(ZkStateReader.URL_PROP, Integer.toString(nodeNumber));
}
@Override
@@ -86,14 +92,14 @@ public class LeaderElectionTest extends
LeaderElector elector = new LeaderElector(zkClient);
ElectionContext context = new ShardLeaderElectionContext("shard1",
- "collection1", Integer.toString(nodeNumber), null);
+ "collection1", Integer.toString(nodeNumber), props, zkClient);
try {
elector.setup(context);
-
seq = elector.joinElection(context);
+ electionDone = true;
seqToThread.put(seq, this);
- } catch (Exception e) {
+ } catch (Throwable e) {
e.printStackTrace();
}
@@ -114,31 +120,32 @@ public class LeaderElectionTest extends
this.stop = true;
}
}
-
+
@Test
- public void testElection() throws Exception {
- // add a dummy slice, just for variance - call it shard2
-
- SolrZkClient zkClient1 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-
- LeaderElector elector = new LeaderElector(zkClient1);
-
- ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", null);
-
+ public void testBasic() throws Exception {
+ LeaderElector elector = new LeaderElector(zkClient);
+ ZkNodeProps props = new ZkNodeProps(ZkStateReader.URL_PROP,"http://127.0.0.1/solr");
+ ElectionContext context = new ShardLeaderElectionContext("shard2", "collection1", "dummynode1", props, zkClient);
elector.setup(context);
elector.joinElection(context);
- zkClient1.close();
-
- SolrZkClient zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
-
- LeaderElector elector2 = new LeaderElector(zkClient2);
-
- ElectionContext context2 = new ShardLeaderElectionContext("shard2", "collection1", "dummynode2", null);
+ assertEquals("http://127.0.0.1/solr", getLeaderUrl("collection1", "shard2"));
+ }
+
+ private String getLeaderUrl(String collection, String slice) throws KeeperException, InterruptedException {
+ int iterCount=30;
+ while (iterCount-- > 0)
+ try {
+ byte[] data = zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null);
+ ZkNodeProps leaderProps = ZkNodeProps.load(data);
+ return leaderProps.get(ZkStateReader.URL_PROP);
+ } catch (NoNodeException e) {
+ Thread.sleep(100);
+ }
+ throw new RuntimeException("Could not get leader props");
+ }
- elector2.setup(context2);
- elector2.joinElection(context2);
-
- zkClient2.close();
+ @Test
+ public void testElection() throws Exception {
List<ClientThread> threads = new ArrayList<ClientThread>();
@@ -152,14 +159,21 @@ public class LeaderElectionTest extends
thread.start();
}
- // make sure the leader node is there from the start
- try {
- zkClient.makePath("/collections/collection1/leader_elect/shard1/leader");
- } catch (KeeperException.NodeExistsException e) {
- // thats fine
+
+ while(true) { //wait for election to complete
+ int doneCount = 0;
+ for (ClientThread thread : threads) {
+ if(thread.electionDone) {
+ doneCount++;
+ }
+ }
+ if(doneCount==15) {
+ break;
+ }
+ Thread.sleep(100);
}
- int leaderThread = Integer.parseInt(getLeader());
+ int leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
// whoever the leader is, should be the n_0 seq
assertEquals(0, threads.get(leaderThread).seq);
@@ -170,9 +184,7 @@ public class LeaderElectionTest extends
((ClientThread) seqToThread.get(1)).close();
((ClientThread) seqToThread.get(3)).close();
- Thread.sleep(50);
-
- leaderThread = Integer.parseInt(getLeader());
+ leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
// whoever the leader is, should be the n_2 seq
assertEquals(2, threads.get(leaderThread).seq);
@@ -184,9 +196,7 @@ public class LeaderElectionTest extends
((ClientThread) seqToThread.get(7)).close();
((ClientThread) seqToThread.get(8)).close();
- Thread.sleep(50);
-
- leaderThread = Integer.parseInt(getLeader());
+ leaderThread = Integer.parseInt(getLeaderUrl("collection1", "shard1"));
// whoever the leader is, should be the n_9 seq
assertEquals(9, threads.get(leaderThread).seq);
@@ -205,6 +215,7 @@ public class LeaderElectionTest extends
@Test
public void testStressElection() throws Exception {
+ //TODO add assertions
final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(100);
final List<ClientThread> threads = Collections
@@ -278,37 +289,6 @@ public class LeaderElectionTest extends
}
- private String getLeader() throws Exception {
-
- String leader = null;
- int tries = 30;
- while (true) {
- if (!zkClient.exists("/collections/collection1/leader_elect/shard1/leader")) {
- if (tries-- == 0) {
- printLayout(server.getZkAddress());
- fail("No registered leader was found");
- }
- Thread.sleep(1000);
- continue;
- }
- List<String> leaderChildren = zkClient.getChildren(
- "/collections/collection1/leader_elect/shard1/leader", null);
- if (leaderChildren.size() > 0) {
- assertEquals("There should only be one leader", 1,
- leaderChildren.size());
- leader = leaderChildren.get(0);
- break;
- } else {
- if (tries-- == 0) {
- printLayout(server.getZkAddress());
- fail("No registered leader was found");
- }
- Thread.sleep(1000);
- }
- }
- return leader;
- }
-
@Override
public void tearDown() throws Exception {
zkClient.close();
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Dec 23 14:26:37 2011
@@ -69,6 +69,7 @@ public class OverseerTest extends SolrTe
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+ ZkStateReader reader = new ZkStateReader(zkClient);
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
@@ -84,46 +85,37 @@ public class OverseerTest extends SolrTe
System.setProperty("bootstrap_confdir", getFile("solr/conf")
.getAbsolutePath());
-
- CloudDescriptor collection1Desc = new CloudDescriptor();
- collection1Desc.setCollectionName("collection1");
-
- CoreDescriptor desc = new CoreDescriptor(null, "core1", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard1 = zkController.register("core1", desc);
- collection1Desc.setShardId(null);
- desc = new CoreDescriptor(null, "core2", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard2 = zkController.register("core2", desc);
- collection1Desc.setShardId(null);
- desc = new CoreDescriptor(null, "core3", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard3 = zkController.register("core3", desc);
- collection1Desc.setShardId(null);
- desc = new CoreDescriptor(null, "core4", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard4 = zkController.register("core4", desc);
- collection1Desc.setShardId(null);
- desc = new CoreDescriptor(null, "core5", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard5 = zkController.register("core5", desc);
- collection1Desc.setShardId(null);
- desc = new CoreDescriptor(null, "core6", "");
- desc.setCloudDescriptor(collection1Desc);
- String shard6 = zkController.register("core6", desc);
- collection1Desc.setShardId(null);
-
- assertEquals("shard1", shard1);
- assertEquals("shard2", shard2);
- assertEquals("shard3", shard3);
- assertEquals("shard1", shard4);
- assertEquals("shard2", shard5);
- assertEquals("shard3", shard6);
+ final int numShards=6;
+ final String[] ids = new String[numShards];
+
+ for (int i = 0; i < numShards; i++) {
+ CloudDescriptor collection1Desc = new CloudDescriptor();
+ collection1Desc.setCollectionName("collection1");
+ CoreDescriptor desc1 = new CoreDescriptor(null, "core"
+ + (i + 1), "");
+ desc1.setCloudDescriptor(collection1Desc);
+ ids[i] = zkController.register("core" + (i + 1), desc1);
+ }
+
+ assertEquals("shard1", ids[0]);
+ assertEquals("shard2", ids[1]);
+ assertEquals("shard3", ids[2]);
+ assertEquals("shard1", ids[3]);
+ assertEquals("shard2", ids[4]);
+ assertEquals("shard3", ids[5]);
+
+ waitForSliceCount(reader, "collection1", 3);
+
+ //make sure leaders are in cloud state
+ assertNotNull(reader.getLeaderUrl("collection1", "shard1"));
+ assertNotNull(reader.getLeaderUrl("collection1", "shard2"));
+ assertNotNull(reader.getLeaderUrl("collection1", "shard3"));
+
} finally {
if (DEBUG) {
if (zkController != null) {
- zkController.printLayoutToStdOut();
+ zkClient.printLayoutToStdOut();
}
}
if (zkClient != null) {
@@ -268,7 +260,7 @@ public class OverseerTest extends SolrTe
} finally {
if (DEBUG) {
if (controllers[0] != null) {
- controllers[0].printLayoutToStdOut();
+ zkClient.printLayoutToStdOut();
}
}
if (zkClient != null) {
@@ -350,9 +342,7 @@ public class OverseerTest extends SolrTe
Overseer.createClientNodes(zkClient, "node1");
- ElectionContext ec = new OverseerElectionContext("node1");
-
- overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+ overseerClient = electNewOverseer(server.getZkAddress());
HashMap<String, String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.URL_PROP, "http://127.0.0.1/solr");
@@ -442,11 +432,7 @@ public class OverseerTest extends SolrTe
reader.createClusterStateWatchersAndUpdate();
Overseer.createClientNodes(controllerClient, "node1");
-
-
- ElectionContext ec = new OverseerElectionContext("node1");
-
- overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+ overseerClient = electNewOverseer(server.getZkAddress());
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
@@ -481,7 +467,7 @@ public class OverseerTest extends SolrTe
controllerClient.setData(statePath,
ZkStateReader.toJSON(new CoreState[] {state}));
- overseerClient = electNewOverseer(server.getZkAddress(), reader, ec);
+ overseerClient = electNewOverseer(server.getZkAddress());
verifyStatus(reader, ZkStateReader.RECOVERING);
@@ -504,15 +490,14 @@ public class OverseerTest extends SolrTe
}
}
- private SolrZkClient electNewOverseer(String address,
- ZkStateReader reader, ElectionContext ec) throws InterruptedException,
+ private SolrZkClient electNewOverseer(String address) throws InterruptedException,
TimeoutException, IOException, KeeperException {
- SolrZkClient overseerClient;
- OverseerElector overseerElector;
- overseerClient = new SolrZkClient(address, TIMEOUT);
- overseerElector = new OverseerElector(overseerClient, reader);
+ SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ LeaderElector overseerElector = new LeaderElector(zkClient);
+ ElectionContext ec = new OverseerElectionContext(address, zkClient, reader);
overseerElector.setup(ec);
overseerElector.joinElection(ec);
- return overseerClient;
+ return zkClient;
}
}
\ No newline at end of file
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Dec 23 14:26:37 2011
@@ -31,12 +31,8 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.HashPartitioner.Range;
import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-// quasi immutable :(
public class CloudState implements JSONWriter.Writable {
- protected static Logger log = LoggerFactory.getLogger(CloudState.class);
private final Map<String, Map<String,Slice>> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private final Set<String> liveNodes;
@@ -65,25 +61,6 @@ public class CloudState implements JSONW
return null;
}
- // TODO: this method must die - this object should be immutable!!
- public void addSlice(String collection, Slice slice) {
- if (!collectionStates.containsKey(collection)) {
- log.info("New collection");
- collectionStates.put(collection, new HashMap<String,Slice>());
- }
- if (!collectionStates.get(collection).containsKey(slice.getName())) {
- collectionStates.get(collection).put(slice.getName(), slice);
- } else {
- Map<String,ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
-
- Slice existingSlice = collectionStates.get(collection).get(slice.getName());
- shards.putAll(existingSlice.getShards());
- shards.putAll(slice.getShards());
- Slice updatedSlice = new Slice(slice.getName(), shards);
- collectionStates.get(collection).put(slice.getName(), updatedSlice);
- }
- }
-
public Map<String, Slice> getSlices(String collection) {
if(!collectionStates.containsKey(collection))
return null;
@@ -177,10 +154,10 @@ public class CloudState implements JSONW
for(String collectionName: stateMap.keySet()){
Map<String, Object> collection = (Map<String, Object>)stateMap.get(collectionName);
- HashMap<String, Slice> slices = new HashMap<String,Slice>();
+ Map<String, Slice> slices = new LinkedHashMap<String,Slice>();
for(String sliceName: collection.keySet()) {
Map<String, Map<String, String>> sliceMap = (Map<String, Map<String, String>>)collection.get(sliceName);
- HashMap<String, ZkNodeProps> shards = new HashMap<String,ZkNodeProps>();
+ Map<String, ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
for(String shardName: sliceMap.keySet()) {
shards.put(shardName, new ZkNodeProps(sliceMap.get(shardName)));
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java Fri Dec 23 14:26:37 2011
@@ -74,12 +74,16 @@ public class CoreState implements JSONWr
@Override
public int hashCode() {
- return getCoreName().hashCode();
+ return properties.hashCode();
}
@Override
- public boolean equals(Object obj) {
- return hashCode() == obj.hashCode();
+ public boolean equals(Object other) {
+ if(other instanceof CoreState) {
+ CoreState otherState = (CoreState) other;
+ return this.getProperties().equals(otherState.getProperties());
+ }
+ return false;
}
@Override
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1222688&r1=1222687&r2=1222688&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Dec 23 14:26:37 2011
@@ -46,14 +46,16 @@ import org.slf4j.LoggerFactory;
public class ZkStateReader {
private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
- public static final String COLLECTIONS_ZKNODE = "/collections";
public static final String URL_PROP = "url";
public static final String NODE_NAME_PROP = "node_name";
public static final String ROLES_PROP = "roles";
public static final String STATE_PROP = "state";
+ public static final String CORE_PROP = "core";
public static final String SHARD_ID_PROP = "shard_id";
public static final String NUM_SHARDS_PROP = "numShards";
+ public static final String LEADER_PROP = "leader";
+ public static final String COLLECTIONS_ZKNODE = "/collections";
public static final String LIVE_NODES_ZKNODE = "/live_nodes";
public static final String CLUSTER_STATE = "/clusterstate.json";
@@ -65,6 +67,8 @@ public class ZkStateReader {
private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
+
+ public static final String SHARD_LEADERS_ZKNODE = "leaders";
//
// convenience methods... should these go somewhere else?
@@ -377,32 +381,27 @@ public class ZkStateReader {
public ZkNodeProps getLeaderProps(String collection, String shard) throws InterruptedException, KeeperException {
int tries = 30;
- ZkNodeProps props;
- while (true) {
- if (!zkClient
- .exists("/collections/" + collection + "/leader_elect/" + shard + "/leader")) {
- if (tries-- == 0) {
- throw new RuntimeException("No registered leader was found");
- }
- Thread.sleep(1000);
- continue;
- }
- String leaderPath = "/collections/" + collection + "/leader_elect/" + shard + "/leader";
- List<String> leaderChildren = zkClient.getChildren(
- leaderPath, null);
- if (leaderChildren.size() > 0) {
- String leader = leaderChildren.get(0);
- byte[] data = zkClient.getData(leaderPath + "/" + leader, null, null);
- props = ZkNodeProps.load(data);
- break;
- } else {
- if (tries-- == 0) {
- throw new RuntimeException("No registered leader was found");
+ while (tries-- > 0) {
+ if (cloudState != null) {
+ Slice slice = cloudState.getSlice(collection, shard);
+ if (slice != null) {
+ for (ZkNodeProps nodeProps : slice.getShards().values()) {
+ if (nodeProps.containsKey(ZkStateReader.LEADER_PROP)) {
+ return nodeProps;
+ }
+ }
}
- Thread.sleep(1000);
}
+ Thread.sleep(200);
+ updateCloudState(true);
}
- return props;
+ throw new RuntimeException("No registered leader was found, collection:" + collection + " slice:" + shard);
+ }
+
+ public static String getShardLeadersPath(String collection, String shardId) {
+ return COLLECTIONS_ZKNODE + "/" + collection + "/"
+ + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
+ : "");
}
}