You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/01/26 16:19:46 UTC
svn commit: r903261 - in /lucene/solr/branches/cloud/src:
java/org/apache/solr/cloud/ java/org/apache/solr/core/
test/org/apache/solr/cloud/
Author: markrmiller
Date: Tue Jan 26 15:19:46 2010
New Revision: 903261
URL: http://svn.apache.org/viewvc?rev=903261&view=rev
Log:
work on cloudstate change notification
Removed:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java Tue Jan 26 15:19:46 2010
@@ -17,11 +17,16 @@
* limitations under the License.
*/
+import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +52,10 @@
return Collections.unmodifiableMap(collectionState);
}
+ public Set<String> getCollections() {
+ return Collections.unmodifiableSet(collectionStates.keySet());
+ }
+
public Set<String> getLiveNodes() {
return Collections.unmodifiableSet(liveNodes);
}
@@ -54,5 +63,74 @@
public boolean liveNodesContain(String name) {
return liveNodes.contains(name);
}
+
+ public static CloudState buildCloudState(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+
+ List<String> collections = zkClient.getChildren(ZkController.COLLECTIONS_ZKNODE, null);
+
+ Map<String,Map<String,Slice>> collectionStates = new HashMap<String,Map<String,Slice>>();
+ for (String collection : collections) {
+ String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/" + collection + ZkController.SHARDS_ZKNODE;
+ List<String> shardIdNames;
+ try {
+ shardIdNames = zkClient.getChildren(shardIdPaths, null);
+ } catch(KeeperException.NoNodeException e) {
+ // node is not valid currently
+ continue;
+ }
+ Map<String,Slice> slices = new HashMap<String,Slice>();
+ for(String shardIdZkPath : shardIdNames) {
+ Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths + "/" + shardIdZkPath);
+ Slice slice = new Slice(shardIdZkPath, shardsMap);
+ slices.put(shardIdZkPath, slice);
+ }
+ collectionStates.put(collection, slices);
+
+ }
+
+ CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
+
+ return cloudInfo;
+ }
+
+ /**
+ * @param zkClient
+ * @param shardsZkPath
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath)
+ throws KeeperException, InterruptedException, IOException {
+
+ Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
+
+ if (zkClient.exists(shardsZkPath, null) == null) {
+ throw new IllegalStateException("Cannot find zk shards node that should exist:"
+ + shardsZkPath);
+ }
+
+ List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
+
+ for(String shardPath : shardZkPaths) {
+ byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
+ null);
+
+ ZkNodeProps props = new ZkNodeProps();
+ props.load(data);
+ shardNameToProps.put(shardPath, props);
+ }
+
+ return Collections.unmodifiableMap(shardNameToProps);
+ }
+
+ private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ List<String> liveNodes = zkClient.getChildren(ZkController.NODES_ZKNODE, null);
+ Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
+ liveNodesSet.addAll(liveNodes);
+
+ return liveNodesSet;
+ }
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java Tue Jan 26 15:19:46 2010
@@ -64,7 +64,10 @@
if(connected) {
executor.shutdownNow();
} else {
- delay = delay * 2; // nocommit : back off retry that levels off
+ // nocommit
+ if(delay < 240000) {
+ delay = delay * 2; // nocommit : back off retry that levels off
+ }
executor.schedule(this, delay, TimeUnit.MILLISECONDS);
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Tue Jan 26 15:19:46 2010
@@ -23,12 +23,14 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -46,6 +48,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +95,10 @@
private String localHost;
private String hostName;
+
+ private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1);
+
+ private boolean cloudStateUpdateScheduled;
/**
@@ -112,6 +119,7 @@
this.localHostPort = locaHostPort;
this.localHostContext = localHostContext;
this.localHost = localHost;
+ cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
// on reconnect, reload cloud info
new OnReconnect() {
@@ -130,7 +138,7 @@
register(core, false);
}
}
- updateCloudState();
+ updateCloudState(false);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -171,7 +179,8 @@
// makes shards zkNode if it doesn't exist
zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
- // ping that there is a new collection (nocommit : or now possibly a new shardId?)
+ // nocommit
+ // ping that there is a new collection or a new shardId
zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
}
} catch (KeeperException e) {
@@ -398,55 +407,61 @@
}
}
- // package private for tests
- String getNodeName() {
+ public String getNodeName() {
return hostName + ":" + localHostPort + "_"+ localHostContext;
}
// load and publish a new CollectionInfo
- public synchronized void updateCloudState() throws KeeperException, InterruptedException,
+ public synchronized void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
IOException {
// TODO: - incremental update rather than reread everything
- log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
-
// build immutable CloudInfo
-
- List<String> collections = getCollectionNames();
- Map<String,Map<String,Slice>> collectionStates = new HashMap<String,Map<String,Slice>>();
- for (String collection : collections) {
- String shardIdPaths = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
- List<String> shardIdNames;
- try {
- shardIdNames = zkClient.getChildren(shardIdPaths, null);
- } catch(KeeperException.NoNodeException e) {
- // node is not valid currently
- continue;
- }
- Map<String,Slice> slices = new HashMap<String,Slice>();
- for(String shardIdZkPath : shardIdNames) {
- Map<String,ZkNodeProps> shardsMap = readShards(shardIdPaths + "/" + shardIdZkPath);
- Slice slice = new Slice(shardIdZkPath, shardsMap);
- slices.put(shardIdZkPath, slice);
+ if(immediate) {
+ log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
+ CloudState cloudState;
+ cloudState = CloudState.buildCloudState(zkClient);
+ // update volatile
+ this.cloudState = cloudState;
+ } else {
+ if(cloudStateUpdateScheduled) {
+ return;
}
- collectionStates.put(collection, slices);
-
+ log.info("Scheduling cloud state update from ZooKeeper...");
+ cloudStateUpdateScheduled = true;
+ updateCloudExecutor.schedule(new Runnable() {
+
+ public void run() {
+ log.info("Updating cloud state from ZooKeeper...");
+ synchronized (ZkController.this) {
+ CloudState cloudState;
+ try {
+ cloudState = CloudState.buildCloudState(zkClient);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ // update volatile
+ ZkController.this.cloudState = cloudState;
+ cloudStateUpdateScheduled = false;
+ }
+ }
+ }, 5000, TimeUnit.MILLISECONDS);
}
-
- CloudState cloudInfo = new CloudState(getLiveNodes(), collectionStates);
-
- // update volatile
- this.cloudState = cloudInfo;
- }
-
- private Set<String> getLiveNodes() throws KeeperException, InterruptedException {
- List<String> liveNodes = zkClient.getChildren(NODES_ZKNODE, null);
- Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
- liveNodesSet.addAll(liveNodes);
- return liveNodesSet;
}
/**
@@ -491,7 +506,7 @@
log.error(
"Multiple configurations were found, but config name to use for collection:"
+ collection + " could not be located", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Multiple configurations were found, but config name to use for collection:"
+ collection + " could not be located", e);
}
@@ -521,39 +536,6 @@
}
/**
- * @param zkClient
- * @param shardsZkPath
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- */
- private Map<String,ZkNodeProps> readShards(String shardsZkPath)
- throws KeeperException, InterruptedException, IOException {
-
- Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
-
- if (zkClient.exists(shardsZkPath, null) == null) {
- throw new IllegalStateException("Cannot find zk shards node that should exist:"
- + shardsZkPath);
- }
-
- List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
-
- for(String shardPath : shardZkPaths) {
- byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
- null);
-
- ZkNodeProps props = new ZkNodeProps();
- props.load(data);
- shardNameToProps.put(shardPath, props);
- }
-
- return Collections.unmodifiableMap(shardNameToProps);
- }
-
-
- /**
* Register shard. A SolrCore calls this on startup to register with
* ZooKeeper.
*
@@ -595,15 +577,19 @@
byte[] bytes = props.store();
- String shardZkNodeName = hostName + ":" + localHostPort + "_"+ localHostContext + (coreName.length() == 0 ? "" : "_" + coreName);
+ String shardZkNodeName = getNodeName() + "_" + coreName;
if(shardZkNodeAlreadyExists && forcePropsUpdate) {
zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
+ // tell everyone to update cloud info
+ zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
} else {
addZkShardsNode(cloudDesc.getShardId(), collection);
try {
zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
CreateMode.PERSISTENT);
+ // tell everyone to update cloud info
+ zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -660,22 +646,6 @@
zkClient.printLayoutToStdOut();
}
- public void watchShards() throws KeeperException, InterruptedException {
-
- // TODO: don't reload whole state when anything changes - just reload what's
- // changed
- // List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE,
- // null);
- // collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
- // for(String collection : collections) {
- // for(String shardId : zkClient.getChildren(COLLECTIONS_ZKNODE + "/" +
- // collection + SHARDS_ZKNODE, null)) {
- // zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection +
- // SHARDS_ZKNODE + "/" + shardId, shardWatcher);
- // }
- // }
- }
-
private void setUpCollectionsNode() throws KeeperException, InterruptedException {
try {
if (!zkClient.exists(COLLECTIONS_ZKNODE)) {
@@ -703,13 +673,13 @@
log.info("Start watching collections node for changes");
zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher(){
- public void process(WatchedEvent event) {
+ public synchronized void process(WatchedEvent event) {
try {
// TODO: fine grained - just reload what's changed
// nocommit
- log.info("children changed");
- // something changed, reload cloud state
- updateCloudState();
+ log.info("Notified of collection change");
+ addShardZkNodeWatches();
+ updateCloudState(false);
// re-watch
zkClient.getChildren(event.getPath(), this);
} catch (KeeperException e) {
@@ -729,6 +699,101 @@
}
}});
+
+ zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
+
+ public synchronized void process(WatchedEvent event) {
+ if(event.getType() != EventType.NodeDataChanged) {
+ return;
+ }
+ log.info("Notified of CloudState change");
+ try {
+ addShardZkNodeWatches();
+ updateCloudState(false);
+ zkClient.exists(COLLECTIONS_ZKNODE, this);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }});
+ }
+
+ public void addShardZkNodeWatches() throws KeeperException, InterruptedException {
+ CloudState cloudState = getCloudState();
+ Set<String> knownCollections = cloudState.getCollections();
+
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+ for(final String collection : collections) {
+ if(!knownCollections.contains(collection)) {
+ zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE, new Watcher(){
+
+ public void process(WatchedEvent event) {
+ //nocommit
+ System.out.println("ShardId node added/removed/changed:");
+ try {
+ addShardsWatches(collection);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }});
+ }
+ }
+ }
+
+ public void addShardsWatches(String collection) throws KeeperException,
+ InterruptedException {
+ if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
+ List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
+ + collection + SHARDS_ZKNODE, null);
+ CloudState cloudState = getCloudState();
+ Set<String> knownShardIds;
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ if (slices != null) {
+ knownShardIds = slices.keySet();
+ } else {
+ knownShardIds = new HashSet<String>(0);
+ }
+ for (final String shardId : shardIds) {
+ if (!knownShardIds.contains(shardId)) {
+ zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
+
+ public void process(WatchedEvent event) {
+ // nocommit
+ System.out.println("shard changed under:" + shardId);
+
+ }
+ });
+ }
+ }
+ }
+ }
+
+ public void addShardsWatches() throws KeeperException, InterruptedException {
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+ for (final String collection : collections) {
+ addShardsWatches(collection);
+ }
}
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Tue Jan 26 15:19:46 2010
@@ -95,7 +95,6 @@
}
private void initZooKeeper(String zkHost, int zkClientTimeout) {
- // nocommit: perhaps get from solr.xml
// if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
if(zkHost == null) {
@@ -196,7 +195,7 @@
public CoreContainer initialize() throws IOException, ParserConfigurationException, SAXException {
CoreContainer cores = null;
String solrHome = SolrResourceLoader.locateSolrHome();
- // nocommit : fix broken logic confusing solr.xml with solrconfig.xml
+ // TODO : fix broken logic confusing solr.xml with solrconfig.xml
File fconf = new File(solrHome, solrConfigFilename == null ? "solr.xml"
: solrConfigFilename);
log.info("looking for solr.xml: " + fconf.getAbsolutePath());
@@ -380,13 +379,13 @@
if (opt != null) {
p.setSchemaName(opt);
}
- // nocommit : default shard list to SHARD: + host:port + context + core
+
opt = DOMUtil.getAttr(node, "shardId", null);
if(testShardIdOverride != null && name.equals("")) {
p.getCloudDescriptor().setShardId(testShardIdOverride);
} else if(zooKeeperController != null) {
if(opt == null) {
- opt = "SHARDID:" + zooKeeperController.getHostName() + ":" + hostPort + "_" + hostContext + "_" + (name.length() == 0 ? "" : "_" + name);
+ opt = "SHARDID:" + zooKeeperController.getNodeName() + "_" + name;
}
p.getCloudDescriptor().setShardId(opt);
}
@@ -425,12 +424,9 @@
if(zooKeeperController != null) {
- // nocommit : exceptions
try {
- zooKeeperController.updateCloudState();
-
- // nocommit : set shards node watches
- zooKeeperController.watchShards();
+ zooKeeperController.addShardZkNodeWatches();
+ zooKeeperController.updateCloudState(true);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -584,8 +580,6 @@
}
IndexSchema schema = null;
if (indexSchemaCache != null) {
- // nocommit: handle ZooKeeper and schema caching
- // schema sharing is enabled. so check if it already is loaded
if (zooKeeperController != null) {
File schemaFile = new File(dcore.getSchemaName());
if (!schemaFile.isAbsolute()) {
@@ -991,7 +985,6 @@
" </cores>\n" +
"</solr>";
- // nocommit: consider - for tests now
public boolean isZooKeeperAware() {
return zooKeeperController != null;
}
@@ -1000,8 +993,4 @@
return zooKeeperController;
}
-
-
-
-
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Tue Jan 26 15:19:46 2010
@@ -80,7 +80,7 @@
zkController = new ZkController(AbstractZkTestCase.ZOO_KEEPER_ADDRESS, TIMEOUT, "localhost",
"8983", "/solr", null);
- zkController.updateCloudState();
+ zkController.updateCloudState(true);
CloudState cloudInfo = zkController.getCloudState();
Map<String,Slice> slices = cloudInfo.getSlices("collection1");
assertNotNull(slices);
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=903261&r1=903260&r2=903261&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Tue Jan 26 15:19:46 2010
@@ -22,6 +22,8 @@
import junit.framework.TestCase;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
public class ZkSolrClientTest extends TestCase {
protected File tmpDir = new File(System.getProperty("java.io.tmpdir")
@@ -137,4 +139,52 @@
}
}
}
+
+ public void testWatchChildren() throws Exception {
+ String zkDir = tmpDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+ ZkTestServer server = null;
+ SolrZkClient zkClient = null;
+ try {
+ server = new ZkTestServer(zkDir);
+ server.run();
+
+ AbstractZkTestCase.makeSolrZkNode();
+
+ zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS, 5);
+
+ zkClient.makePath("/collections");
+
+ zkClient.getChildren("/collections", new Watcher(){
+
+ public void process(WatchedEvent event) {
+ System.out.println("children changed");
+
+ }});
+
+ zkClient.makePath("/collections/collection1/shards");
+
+ zkClient.makePath("collections/collection1/config=collection1");
+
+
+ zkClient.makePath("collections/collection1/config=collection3");
+
+
+ zkClient.printLayoutToStdOut();
+
+
+ } catch (Exception e) {
+ // nocommit
+ e.printStackTrace();
+ throw e;
+ } finally {
+
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
}