You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/01 16:40:47 UTC
svn commit: r905311 - in /lucene/solr/branches/cloud/src:
java/org/apache/solr/cloud/CloudState.java
java/org/apache/solr/cloud/ZkController.java
test/org/apache/solr/cloud/CloudStateUpdateTest.java
Author: markrmiller
Date: Mon Feb 1 15:40:46 2010
New Revision: 905311
URL: http://svn.apache.org/viewvc?rev=905311&view=rev
Log:
all for only updating live nodes, add watch for live node changes, add simple test for live node additions and removals
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java Mon Feb 1 15:40:46 2010
@@ -56,6 +56,10 @@
return Collections.unmodifiableSet(collectionStates.keySet());
}
+ public Map<String,Map<String,Slice>> getCollectionStates() {
+ return Collections.unmodifiableMap(collectionStates);
+ }
+
public Set<String> getLiveNodes() {
return Collections.unmodifiableSet(liveNodes);
}
@@ -64,28 +68,35 @@
return liveNodes.contains(name);
}
- public static CloudState buildCloudState(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+ public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
+ Map<String,Map<String,Slice>> collectionStates;
+ if (!onlyLiveNodes) {
+ List<String> collections = zkClient.getChildren(
+ ZkController.COLLECTIONS_ZKNODE, null);
+
+ collectionStates = new HashMap<String,Map<String,Slice>>();
+ for (String collection : collections) {
+ String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/"
+ + collection + ZkController.SHARDS_ZKNODE;
+ List<String> shardIdNames;
+ try {
+ shardIdNames = zkClient.getChildren(shardIdPaths, null);
+ } catch (KeeperException.NoNodeException e) {
+ // node is not valid currently
+ continue;
+ }
+ Map<String,Slice> slices = new HashMap<String,Slice>();
+ for (String shardIdZkPath : shardIdNames) {
+ Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
+ + "/" + shardIdZkPath);
+ Slice slice = new Slice(shardIdZkPath, shardsMap);
+ slices.put(shardIdZkPath, slice);
+ }
+ collectionStates.put(collection, slices);
- List<String> collections = zkClient.getChildren(ZkController.COLLECTIONS_ZKNODE, null);
-
- Map<String,Map<String,Slice>> collectionStates = new HashMap<String,Map<String,Slice>>();
- for (String collection : collections) {
- String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/" + collection + ZkController.SHARDS_ZKNODE;
- List<String> shardIdNames;
- try {
- shardIdNames = zkClient.getChildren(shardIdPaths, null);
- } catch(KeeperException.NoNodeException e) {
- // node is not valid currently
- continue;
}
- Map<String,Slice> slices = new HashMap<String,Slice>();
- for(String shardIdZkPath : shardIdNames) {
- Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths + "/" + shardIdZkPath);
- Slice slice = new Slice(shardIdZkPath, shardsMap);
- slices.put(shardIdZkPath, slice);
- }
- collectionStates.put(collection, slices);
-
+ } else {
+ collectionStates = oldCloudState.getCollectionStates();
}
CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Mon Feb 1 15:40:46 2010
@@ -128,7 +128,7 @@
public void command() {
try {
- createEphemeralNode();
+ createEphemeralLiveNode();
// register cores in case any new cores came online will zk was down
// coreContainer may currently be null in tests, so don't re-register
@@ -361,7 +361,7 @@
"", e);
}
}
- createEphemeralNode();
+ createEphemeralLiveNode();
setUpCollectionsNode();
@@ -383,11 +383,41 @@
}
- private void createEphemeralNode() throws KeeperException,
+ private void createEphemeralLiveNode() throws KeeperException,
InterruptedException {
String nodeName = getNodeName();
String nodePath = NODES_ZKNODE + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
+ Watcher liveNodeWatcher = new Watcher() {
+
+ public void process(WatchedEvent event) {
+ try {
+ log.info("Updating live nodes:" + zkClient);
+ try {
+ updateLiveNodes();
+ } finally {
+ // remake watch
+ zkClient.getChildren(event.getPath(), this);
+ }
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+
+ };
try {
zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
@@ -396,6 +426,7 @@
throw e;
}
}
+ zkClient.getChildren(NODES_ZKNODE, liveNodeWatcher);
}
public String getNodeName() {
@@ -403,7 +434,19 @@
}
// load and publish a new CollectionInfo
- public synchronized void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+ public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(immediate, false);
+ }
+
+ // load and publish a new CollectionInfo
+ public void updateLiveNodes() throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(true, true);
+ }
+
+ // load and publish a new CollectionInfo
+ private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
IOException {
// TODO: - incremental update rather than reread everything
@@ -411,9 +454,13 @@
// build immutable CloudInfo
if(immediate) {
- log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
+ if(!onlyLiveNodes) {
+ log.info("Updating cloud state from ZooKeeper... ");
+ } else {
+ log.info("Updating live nodes from ZooKeeper... ");
+ }
CloudState cloudState;
- cloudState = CloudState.buildCloudState(zkClient);
+ cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
// update volatile
this.cloudState = cloudState;
} else {
@@ -431,7 +478,8 @@
cloudStateUpdateScheduled = false;
CloudState cloudState;
try {
- cloudState = CloudState.buildCloudState(zkClient);
+ cloudState = CloudState.buildCloudState(zkClient,
+ ZkController.this.cloudState, onlyLiveNodes);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Mon Feb 1 15:40:46 2010
@@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ *
+ */
public class CloudStateUpdateTest extends TestCase {
protected static Logger log = LoggerFactory
.getLogger(AbstractZkTestCase.class);
@@ -44,14 +47,12 @@
protected String zkDir;
private CoreContainer container1;
-
private CoreContainer container2;
-
private CoreContainer container3;
private File dataDir1;
-
private File dataDir2;
+ private File dataDir3;
public void setUp() throws Exception {
try {
@@ -69,6 +70,9 @@
dataDir2 = new File(tmpDir + File.separator + "data2");
dataDir2.mkdirs();
+
+ dataDir3 = new File(tmpDir + File.separator + "data3");
+ dataDir3.mkdirs();
// set some system properties for use by tests
System.setProperty("solr.test.sys.prop1", "propone");
@@ -94,7 +98,7 @@
CoreContainer.Initializer init3 = new CoreContainer.Initializer() {
{
- this.dataDir = CloudStateUpdateTest.this.dataDir2.getAbsolutePath();
+ this.dataDir = CloudStateUpdateTest.this.dataDir3.getAbsolutePath();
this.zkPortOverride = "8985";
}
};
@@ -149,8 +153,10 @@
liveNodes = zkController2.getCloudState().getLiveNodes();
- // nocommit - fix update cloud state when nodes removed
- //assertEquals(2, liveNodes.size());
+ // slight pause for watch to trigger
+ Thread.sleep(500);
+
+ assertEquals(2, liveNodes.size());
}
public void tearDown() throws Exception {