You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/07 18:17:08 UTC
svn commit: r1228672 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/ webapp/web/ webapp/web/admin/
Author: markrmiller
Date: Sat Jan 7 17:17:07 2012
New Revision: 1228672
URL: http://svn.apache.org/viewvc?rev=1228672&view=rev
Log:
move retries into our zk solr client and force a choice on collection loss for each call
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/branches/solrcloud/solr/webapp/web/admin/zookeeper.jsp
lucene/dev/branches/solrcloud/solr/webapp/web/zookeeper.jsp
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sat Jan 7 17:17:07 2012
@@ -1,10 +1,8 @@
package org.apache.solr.cloud;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZkOperation;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -46,28 +44,20 @@ public abstract class ElectionContext {
final class ShardLeaderElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
- private ZkCmdExecutor proto;
+
public ShardLeaderElectionContext(final String shardId,
final String collection, final String shardZkNodeName, ZkNodeProps props, SolrZkClient zkClient) {
super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
+ shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
props);
this.zkClient = zkClient;
- this.proto = new ZkCmdExecutor(zkClient);
}
@Override
void runLeaderProcess() throws KeeperException, InterruptedException {
- proto.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkClient.makePath(leaderPath, leaderProps == null ? null
- : ZkStateReader.toJSON(leaderProps), CreateMode.EPHEMERAL);
- return null;
- }
- });
-
+ zkClient.makePath(leaderPath,
+ leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
+ CreateMode.EPHEMERAL, true);
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Sat Jan 7 17:17:07 2012
@@ -63,13 +63,12 @@ public class LeaderElector {
private final static Pattern LEADER_SEQ = Pattern.compile(".*?/?.*?-n_(\\d+)");
private final static Pattern SESSION_ID = Pattern.compile(".*?/?(.*?)-n_\\d+");
- private ZkCmdExecutor cmdExecutor;
-
protected SolrZkClient zkClient;
+ private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
+
public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
- cmdExecutor = new ZkCmdExecutor(zkClient);
}
/**
@@ -89,14 +88,7 @@ public class LeaderElector {
InterruptedException, IOException {
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
- List<String> seqs = cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(holdElectionPath, null);
- }
- });
-
+ List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
sortSeqs(seqs);
List<Integer> intSeqs = getSeqs(seqs);
@@ -134,7 +126,7 @@ public class LeaderElector {
}
}
- }, null);
+ }, null, true);
} catch (KeeperException e) {
// we couldn't set our watch - the node before us may already be down?
// we need to check if we are the leader again
@@ -216,17 +208,11 @@ public class LeaderElector {
while (cont) {
try {
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
- CreateMode.EPHEMERAL_SEQUENTIAL);
+ CreateMode.EPHEMERAL_SEQUENTIAL, false);
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
- List<String> entries = cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(shardsElectZkPath, null);
- }
- });
+ List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
boolean foundId = false;
for (String entry : entries) {
@@ -266,12 +252,11 @@ public class LeaderElector {
* @throws InterruptedException
* @throws KeeperException
*/
- public void setup(final ElectionContext context)
- throws InterruptedException, KeeperException {
- String electZKPath = context.electionPath
- + LeaderElector.ELECTION_NODE;
+ public void setup(final ElectionContext context) throws InterruptedException,
+ KeeperException {
+ String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;
- cmdExecutor.ensureExists(electZKPath);
+ zkCmdExecutor.ensureExists(electZKPath, zkClient);
}
/**
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java Sat Jan 7 17:17:07 2012
@@ -78,7 +78,7 @@ public class NodeStateWatcher implements
}
private void processStateChange() throws KeeperException, InterruptedException {
- byte[] data = zkClient.getData(path, this, null);
+ byte[] data = zkClient.getData(path, this, null, true);
if (data != null) {
CoreState[] states = CoreState.load(data);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sat Jan 7 17:17:07 2012
@@ -69,7 +69,7 @@ public class Overseer implements NodeSta
public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
log.info("Constructing new Overseer");
this.zkClient = zkClient;
- this.zkCmdExecutor = new ZkCmdExecutor(zkClient);
+ this.zkCmdExecutor = new ZkCmdExecutor();
this.reader = reader;
createWatches();
}
@@ -86,33 +86,28 @@ public class Overseer implements NodeSta
private void addCollectionsWatch() throws KeeperException,
InterruptedException {
- zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
+ zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
- List<String> collections = zkCmdExecutor.retryOperation(new ZkOperation() {
+ List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
@Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this);
- collectionsChanged(collections);
- } catch (KeeperException e) {
- if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
+ public void process(WatchedEvent event) {
+ try {
+ List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this, true);
+ collectionsChanged(collections);
+ } catch (KeeperException e) {
+ if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
}
- });
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
}
- });
+ }, true);
collectionsChanged(collections);
}
@@ -138,42 +133,36 @@ public class Overseer implements NodeSta
private void addShardLeadersWatch(final String collection) throws KeeperException,
InterruptedException {
- zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null));
+ zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null), zkClient);
- final List<String> leaderNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
+ final List<String> leaderNodes = zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> leaderNodes = zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), this, true);
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> leaderNodes = zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), this);
-
- processLeaderNodesChanged(collection, leaderNodes);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
+ processLeaderNodesChanged(collection, leaderNodes);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
}
- });
- }
- });
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+ }, true);
processLeaderNodesChanged(collection, leaderNodes);
}
@@ -229,7 +218,7 @@ public class Overseer implements NodeSta
public void process(WatchedEvent event) {
try {
List<String> liveNodes = zkClient.getChildren(
- ZkStateReader.LIVE_NODES_ZKNODE, this);
+ ZkStateReader.LIVE_NODES_ZKNODE, this, true);
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
@@ -250,7 +239,7 @@ public class Overseer implements NodeSta
SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
- });
+ }, true);
}
});
@@ -277,7 +266,7 @@ public class Overseer implements NodeSta
final String path = STATES_NODE + "/" + nodeName;
synchronized (nodeStateWatches) {
if (!nodeStateWatches.containsKey(nodeName)) {
- zkCmdExecutor.ensureExists(path);
+ zkCmdExecutor.ensureExists(path, zkClient);
nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
} else {
log.debug("watch already added");
@@ -338,17 +327,10 @@ public class Overseer implements NodeSta
for (CoreState state : states) {
cloudState = updateState(cloudState, nodeName, state);
}
- final CloudState finalState = cloudState;
+
try {
- zkCmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(finalState));
- return null;
- }
- });
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -363,8 +345,8 @@ public class Overseer implements NodeSta
log.info("creating node:" + node);
}
- ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(zkClient);
- zkCmdExecutor.ensureExists(node);
+ ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
+ zkCmdExecutor.ensureExists(node, zkClient);
}
private CloudState updateSlice(CloudState state, String collection, Slice slice) {
@@ -460,15 +442,8 @@ public class Overseer implements NodeSta
if (state != newState) { // if same instance was returned no need to
// update state
log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
- zkCmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(newState));
- return null;
- }
- });
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(newState), true);
} else {
log.debug("State was not changed.");
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java Sat Jan 7 17:17:07 2012
@@ -57,7 +57,7 @@ public class ShardLeaderWatcher implemen
private void processLeaderChange() throws KeeperException, InterruptedException {
if(closed) return;
try {
- byte[] data = zkClient.getData(path, this, null);
+ byte[] data = zkClient.getData(path, this, null, true);
if (data != null) {
final ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
listener.announceLeader(collection, shard, leaderProps);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sat Jan 7 17:17:07 2012
@@ -33,7 +33,15 @@ import java.util.regex.Pattern;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkOperation;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
@@ -41,7 +49,6 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,7 +187,7 @@ public final class ZkController {
}
});
- cmdExecutor = new ZkCmdExecutor(zkClient);
+ cmdExecutor = new ZkCmdExecutor();
leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
init();
@@ -212,7 +219,7 @@ public final class ZkController {
*/
public boolean configFileExists(String collection, String fileName)
throws KeeperException, InterruptedException {
- Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+ Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null, true);
return stat != null;
}
@@ -233,7 +240,7 @@ public final class ZkController {
public byte[] getConfigFileData(String zkConfigName, String fileName)
throws KeeperException, InterruptedException {
String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
- byte[] bytes = zkClient.getData(zkPath, null, null);
+ byte[] bytes = zkClient.getData(zkPath, null, null, true);
if (bytes == null) {
log.error("Config file contains no data:" + zkPath);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -291,11 +298,11 @@ public final class ZkController {
}
// makes nodes zkNode
- cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE);
+ cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
Overseer.createClientNodes(zkClient, getNodeName());
createEphemeralLiveNode();
- cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
+ cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
overseerElector = new LeaderElector(zkClient);
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
@@ -335,7 +342,7 @@ public final class ZkController {
// until expiration timeout - so a node won't be created here because
// it exists, but eventually the node will be removed. So delete
// in case it exists and create a new node.
- zkClient.delete(nodePath, -1);
+ zkClient.delete(nodePath, -1, true);
} catch (KeeperException.NoNodeException e) {
// fine if there is nothing to delete
// TODO: annoying that ZK logs a warning on us
@@ -346,7 +353,7 @@ public final class ZkController {
.info("Found a previous node that still exists while trying to register a new live node "
+ nodePath + " - removing existing node to create another.");
}
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -367,7 +374,7 @@ public final class ZkController {
*/
public boolean pathExists(String path) throws KeeperException,
InterruptedException {
- return zkClient.exists(path);
+ return zkClient.exists(path, true);
}
/**
@@ -386,14 +393,14 @@ public final class ZkController {
if (log.isInfoEnabled()) {
log.info("Load collection config from:" + path);
}
- byte[] data = zkClient.getData(path, null, null);
+ byte[] data = zkClient.getData(path, null, null, true);
if(data != null) {
ZkNodeProps props = ZkNodeProps.load(data);
configName = props.get(CONFIGNAME_PROP);
}
- if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+ if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
log.error("Specified config does not exist in ZooKeeper:" + configName);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Specified config does not exist in ZooKeeper:" + configName);
@@ -631,7 +638,7 @@ public final class ZkController {
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
try {
- if(!zkClient.exists(collectionPath)) {
+ if(!zkClient.exists(collectionPath, true)) {
log.info("Creating collection in ZooKeeper:" + collection);
SolrParams params = cd.getParams();
@@ -675,14 +682,14 @@ public final class ZkController {
log.info("Looking for collection configName");
int retry = 1;
for (; retry < 6; retry++) {
- if (zkClient.exists(collectionPath)) {
- ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null));
+ if (zkClient.exists(collectionPath, true)) {
+ ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
if (cProps.containsKey(CONFIGNAME_PROP)) {
break;
}
}
// if there is only one conf, use that
- List<String> configNames = zkClient.getChildren(CONFIGS_ZKNODE, null);
+ List<String> configNames = zkClient.getChildren(CONFIGS_ZKNODE, null, true);
if (configNames.size() == 1) {
// no config set named, but there is only 1 - use it
log.info("Only one config set found in zk - using it:" + configNames.get(0));
@@ -704,7 +711,7 @@ public final class ZkController {
zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
// ping that there is a new collection
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null, true);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -743,7 +750,7 @@ public final class ZkController {
public Object execute() throws KeeperException, InterruptedException {
zkClient.setData(
nodePath,
- ZkStateReader.toJSON(coreStates.values()));
+ ZkStateReader.toJSON(coreStates.values()), true);
return null;
}
});
@@ -790,7 +797,7 @@ public final class ZkController {
for(File file : files) {
if (!file.getName().startsWith(".")) {
if (!file.isDirectory()) {
- zkClient.makePath(zkPath + "/" + file.getName(), file);
+ zkClient.makePath(zkPath + "/" + file.getName(), file, true);
} else {
uploadToZK(zkClient, file, zkPath + "/" + file.getName());
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Sat Jan 7 17:17:07 2012
@@ -74,7 +74,7 @@ public class ZkSolrResourceLoader extend
String file = collectionZkPath + "/" + resource;
try {
if (zkController.pathExists(file)) {
- byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null);
+ byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null, true);
return new ByteArrayInputStream(bytes);
}
} catch (Exception e) {
@@ -105,7 +105,7 @@ public class ZkSolrResourceLoader extend
public String[] listConfigDir() {
List<String> list;
try {
- list = zkController.getZkClient().getChildren(collectionZkPath, null);
+ list = zkController.getZkClient().getChildren(collectionZkPath, null, true);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sat Jan 7 17:17:07 2012
@@ -18,7 +18,6 @@ package org.apache.solr.cloud;
*/
import java.io.File;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,11 +26,9 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkOperation;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrConfig;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -76,76 +73,37 @@ public abstract class AbstractZkTestCase
static void buildZooKeeper(String zkHost, String zkAddress, String config,
String schema) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, AbstractZkTestCase.TIMEOUT);
- zkClient.makePath("/solr");
+ zkClient.makePath("/solr", false, true);
zkClient.close();
zkClient = new SolrZkClient(zkAddress, AbstractZkTestCase.TIMEOUT);
- final ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(zkClient);
+ final ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
Map<String,String> props = new HashMap<String,String>();
props.put("configName", "conf1");
final ZkNodeProps zkProps = new ZkNodeProps(props);
- zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkCmdExecutor.getZkClient().makePath("/collections/collection1", ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT);
- return null;
- }
- });
-
- zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkCmdExecutor.getZkClient().makePath("/collections/collection1/shards", CreateMode.PERSISTENT);
- return null;
- }
- });
-
- zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkCmdExecutor.getZkClient().makePath("/collections/control_collection", ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT);
- return null;
- }
- });
-
- zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkCmdExecutor.getZkClient().makePath("/collections/control_collection/shards", CreateMode.PERSISTENT);
- return null;
- }
- });
-
-
- putConfig(zkCmdExecutor, config);
- putConfig(zkCmdExecutor, schema);
- putConfig(zkCmdExecutor, "solrconfig.xml");
- putConfig(zkCmdExecutor, "stopwords.txt");
- putConfig(zkCmdExecutor, "protwords.txt");
- putConfig(zkCmdExecutor, "mapping-ISOLatin1Accent.txt");
- putConfig(zkCmdExecutor, "old_synonyms.txt");
- putConfig(zkCmdExecutor, "synonyms.txt");
+ zkClient.makePath("/collections/collection1", ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, true);
+ zkClient.makePath("/collections/collection1/shards", CreateMode.PERSISTENT, true);
+ zkClient.makePath("/collections/control_collection", ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, true);
+ zkClient.makePath("/collections/control_collection/shards", CreateMode.PERSISTENT, true);
+
+ putConfig(zkClient, config);
+ putConfig(zkClient, schema);
+ putConfig(zkClient, "solrconfig.xml");
+ putConfig(zkClient, "stopwords.txt");
+ putConfig(zkClient, "protwords.txt");
+ putConfig(zkClient, "mapping-ISOLatin1Accent.txt");
+ putConfig(zkClient, "old_synonyms.txt");
+ putConfig(zkClient, "synonyms.txt");
zkClient.close();
}
- private static void putConfig(final ZkCmdExecutor zkCmdExecutor, final String name)
+ private static void putConfig(SolrZkClient zkClient, final String name)
throws Exception {
- zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- try {
- zkCmdExecutor.getZkClient().makePath("/configs/conf1/" + name, getFile("solr"
- + File.separator + "conf" + File.separator + name), false);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
- });
-
+ zkClient.makePath("/configs/conf1/" + name, getFile("solr"
+ + File.separator + "conf" + File.separator + name), false, false);
}
@Override
@@ -178,7 +136,7 @@ public abstract class AbstractZkTestCase
public static void makeSolrZkNode(String zkHost) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, TIMEOUT);
- zkClient.makePath("/solr", false);
+ zkClient.makePath("/solr", false, true);
zkClient.close();
}
@@ -188,12 +146,12 @@ public abstract class AbstractZkTestCase
static void tryCleanPath(String zkHost, String path) throws Exception {
SolrZkClient zkClient = new SolrZkClient(zkHost, TIMEOUT);
- if (zkClient.exists(path)) {
- List<String> children = zkClient.getChildren(path, null);
+ if (zkClient.exists(path, true)) {
+ List<String> children = zkClient.getChildren(path, null, true);
for (String string : children) {
tryCleanPath(zkHost, path+"/"+string);
}
- zkClient.delete(path, -1);
+ zkClient.delete(path, -1, true);
}
zkClient.close();
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java Sat Jan 7 17:17:07 2012
@@ -139,7 +139,7 @@ public class BasicZkTest extends Abstrac
// SOLR-2651: test that reload still gets config files from zookeeper
- zkController.getZkClient().setData("/configs/conf1/solrconfig.xml", new byte[0]);
+ zkController.getZkClient().setData("/configs/conf1/solrconfig.xml", new byte[0], true);
// we set the solrconfig to nothing, so this reload should fail
try {
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Sat Jan 7 17:17:07 2012
@@ -138,11 +138,14 @@ public class CloudStateUpdateTest extend
props2.put("configName", "conf1");
ZkNodeProps zkProps2 = new ZkNodeProps(props2);
- SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore", ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT);
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore/shards", CreateMode.PERSISTENT);
+ SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(),
+ AbstractZkTestCase.TIMEOUT);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore",
+ ZkStateReader.toJSON(zkProps2), CreateMode.PERSISTENT, true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/testcore/shards",
+ CreateMode.PERSISTENT, true);
zkClient.close();
-
+
CoreDescriptor dcore = new CoreDescriptor(container1, "testcore",
"testcore");
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sat Jan 7 17:17:07 2012
@@ -29,10 +29,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkOperation;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrConfig;
import org.apache.zookeeper.KeeperException;
@@ -50,7 +48,6 @@ public class LeaderElectionTest extends
private Map<Integer,Thread> seqToThread;
private volatile boolean stopStress = false;
- private ZkCmdExecutor zkCmdExecutor;
@BeforeClass
public static void beforeClass() throws Exception {
@@ -75,7 +72,6 @@ public class LeaderElectionTest extends
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
- zkCmdExecutor = new ZkCmdExecutor(zkClient);
seqToThread = new HashMap<Integer,Thread>();
}
@@ -153,12 +149,7 @@ public class LeaderElectionTest extends
int iterCount=30;
while (iterCount-- > 0)
try {
- byte[] data = zkCmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public byte[] execute() throws KeeperException, InterruptedException {
- return zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null);
- }
- });
+ byte[] data = zkClient.getData(ZkStateReader.getShardLeadersPath(collection, slice), null, null, true);
ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
return leaderProps.getCoreUrl();
} catch (NoNodeException e) {
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Sat Jan 7 17:17:07 2012
@@ -337,13 +337,13 @@ public class OverseerTest extends SolrTe
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
- zkClient.makePath("/live_nodes");
+ zkClient.makePath("/live_nodes", true);
System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "2");
//live node
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
- zkClient.makePath(nodePath,CreateMode.EPHEMERAL);
+ zkClient.makePath(nodePath,CreateMode.EPHEMERAL, true);
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
@@ -363,14 +363,14 @@ public class OverseerTest extends SolrTe
nodePath = "/node_states/node1";
try {
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
} catch (KeeperException ke) {
if(ke.code()!=Code.NODEEXISTS) {
throw ke;
}
}
//publish node state (recovering)
- zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}));
+ zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
//wait overseer assignment
waitForSliceCount(reader, "collection1", 1);
@@ -385,7 +385,7 @@ public class OverseerTest extends SolrTe
coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
state = new CoreState("core1", "collection1", coreProps);
- zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}));
+ zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
verifyStatus(reader, ZkStateReader.ACTIVE);
@@ -436,7 +436,7 @@ public class OverseerTest extends SolrTe
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
- controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
+ controllerClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
reader = new ZkStateReader(controllerClient);
reader.createClusterStateWatchersAndUpdate();
@@ -446,7 +446,7 @@ public class OverseerTest extends SolrTe
// live node
final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
- controllerClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ controllerClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
HashMap<String,String> coreProps = new HashMap<String,String>();
coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
@@ -455,7 +455,7 @@ public class OverseerTest extends SolrTe
final String statePath = Overseer.STATES_NODE + "/node1";
// publish node state (recovering)
- controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}));
+ controllerClient.setData(statePath, ZkStateReader.toJSON(new CoreState[] {state}), true);
// wait overseer assignment
waitForSliceCount(reader, "collection1", 1);
@@ -467,7 +467,7 @@ public class OverseerTest extends SolrTe
coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
state = new CoreState("core1", "collection1", coreProps);
controllerClient.setData(statePath,
- ZkStateReader.toJSON(new CoreState[] {state}));
+ ZkStateReader.toJSON(new CoreState[] {state}), true);
verifyStatus(reader, ZkStateReader.ACTIVE);
overseerClient.close();
@@ -476,7 +476,7 @@ public class OverseerTest extends SolrTe
state = new CoreState("core1", "collection1", coreProps);
controllerClient.setData(statePath,
- ZkStateReader.toJSON(new CoreState[] {state}));
+ ZkStateReader.toJSON(new CoreState[] {state}), true);
overseerClient = electNewOverseer(server.getZkAddress());
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sat Jan 7 17:17:07 2012
@@ -60,12 +60,14 @@ public class ZkControllerTest extends So
SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
String actualConfigName = "firstConfig";
- zkClient.makePath(ZkController.CONFIGS_ZKNODE + "/" + actualConfigName);
+ zkClient.makePath(ZkController.CONFIGS_ZKNODE + "/" + actualConfigName, true);
Map<String,String> props = new HashMap<String,String>();
props.put("configName", actualConfigName);
ZkNodeProps zkProps = new ZkNodeProps(props);
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION_NAME , ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/"
+ + COLLECTION_NAME, ZkStateReader.toJSON(zkProps),
+ CreateMode.PERSISTENT, true);
if (DEBUG) {
zkClient.printLayoutToStdOut();
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Sat Jan 7 17:17:07 2012
@@ -60,7 +60,7 @@ public class ZkSolrClientTest extends Ab
SolrZkClient zkClient = new SolrZkClient(server.getZkHost(),
AbstractZkTestCase.TIMEOUT);
- assertTrue(zkClient.exists("/solr"));
+ assertTrue(zkClient.exists("/solr", true));
zkClient.close();
server.shutdown();
@@ -79,9 +79,9 @@ public class ZkSolrClientTest extends Ab
zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
String shardsPath = "/collections/collection1/shards";
- zkClient.makePath(shardsPath);
+ zkClient.makePath(shardsPath, false, true);
- zkClient.makePath("collections/collection1", false);
+ zkClient.makePath("collections/collection1", false, true);
int zkServerPort = server.getPort();
// this tests disconnect state
server.shutdown();
@@ -90,7 +90,7 @@ public class ZkSolrClientTest extends Ab
try {
- zkClient.makePath("collections/collection2");
+ zkClient.makePath("collections/collection2", false);
TestCase.fail("Server should be down here");
} catch (KeeperException.ConnectionLossException e) {
@@ -105,18 +105,18 @@ public class ZkSolrClientTest extends Ab
Thread.sleep(600);
try {
- zkClient.makePath("collections/collection3");
+ zkClient.makePath("collections/collection3", true);
} catch (KeeperException.ConnectionLossException e) {
Thread.sleep(5000); // try again in a bit
- zkClient.makePath("collections/collection3");
+ zkClient.makePath("collections/collection3", true);
}
if (DEBUG) {
zkClient.printLayoutToStdOut();
}
- assertNotNull(zkClient.exists("/collections/collection3", null));
- assertNotNull(zkClient.exists("/collections/collection1", null));
+ assertNotNull(zkClient.exists("/collections/collection3", null, true));
+ assertNotNull(zkClient.exists("/collections/collection1", null, true));
// simulate session expiration
@@ -133,7 +133,7 @@ public class ZkSolrClientTest extends Ab
for (int i = 0; i < 8; i++) {
try {
- zkClient.makePath("collections/collection4");
+ zkClient.makePath("collections/collection4", true);
break;
} catch (KeeperException.SessionExpiredException e) {
@@ -147,7 +147,7 @@ public class ZkSolrClientTest extends Ab
zkClient.printLayoutToStdOut();
}
- assertNotNull("Node does not exist, but it should", zkClient.exists("/collections/collection4", null));
+ assertNotNull("Node does not exist, but it should", zkClient.exists("/collections/collection4", null, true));
} finally {
@@ -171,7 +171,7 @@ public class ZkSolrClientTest extends Ab
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
final SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
try {
- zkClient.makePath("/collections");
+ zkClient.makePath("/collections", true);
zkClient.getChildren("/collections", new Watcher() {
@@ -182,22 +182,22 @@ public class ZkSolrClientTest extends Ab
cnt.incrementAndGet();
// remake watch
try {
- zkClient.getChildren("/collections", this);
+ zkClient.getChildren("/collections", this, true);
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- });
+ }, true);
- zkClient.makePath("/collections/collection99/shards");
+ zkClient.makePath("/collections/collection99/shards", true);
- zkClient.makePath("collections/collection99/config=collection1");
+ zkClient.makePath("collections/collection99/config=collection1", true);
- zkClient.makePath("collections/collection99/config=collection3");
+ zkClient.makePath("collections/collection99/config=collection3", true);
- zkClient.makePath("/collections/collection97/shards");
+ zkClient.makePath("/collections/collection97/shards", true);
if (DEBUG) {
zkClient.printLayoutToStdOut();
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Sat Jan 7 17:17:07 2012
@@ -154,7 +154,7 @@ public class CloudState implements JSONW
public static CloudState load(SolrZkClient zkClient, Set<String> liveNodes) throws KeeperException, InterruptedException {
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
- null, null);
+ null, null, true);
return load(state, liveNodes);
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sat Jan 7 17:17:07 2012
@@ -70,6 +70,8 @@ public class SolrZkClient {
private ConnectionManager connManager;
private volatile SolrZooKeeper keeper;
+
+ private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
private volatile boolean isClosed = false;
@@ -151,12 +153,23 @@ public class SolrZkClient {
/**
* @param path
* @param version
+ * @param retryOnConnLoss
* @throws InterruptedException
* @throws KeeperException
*/
- public void delete(final String path, int version)
+ public void delete(final String path, final int version, boolean retryOnConnLoss)
throws InterruptedException, KeeperException {
- keeper.delete(path, version);
+ if (retryOnConnLoss) {
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ keeper.delete(path, version);
+ return null;
+ }
+ });
+ } else {
+ keeper.delete(path, version);
+ }
}
/**
@@ -170,26 +183,46 @@ public class SolrZkClient {
*
* @param path the node path
* @param watcher explicit watcher
+ * @param retryOnConnLoss
* @return the stat of the node of the given path; return null if no such a
* node exists.
* @throws KeeperException If the server signals an error
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
- public Stat exists(final String path, Watcher watcher)
+ public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- return keeper.exists(path, watcher);
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ return keeper.exists(path, watcher);
+ }
+ });
+ } else {
+ return keeper.exists(path, watcher);
+ }
}
/**
* @param path
* @return true if path exists
* @throws KeeperException
+ * @param retryOnConnLoss
* @throws InterruptedException
*/
- public boolean exists(final String path)
+ public Boolean exists(final String path, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- return keeper.exists(path, null) != null;
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Boolean execute() throws KeeperException, InterruptedException {
+ return keeper.exists(path, null) != null;
+ }
+ });
+ } else {
+ return keeper.exists(path, null) != null;
+ }
}
/**
@@ -197,51 +230,91 @@ public class SolrZkClient {
* @param data
* @param acl
* @param createMode
+ * @param retryOnConnLoss
* @return path of created node
* @throws KeeperException
* @throws InterruptedException
*/
- public String create(final String path, byte data[], List<ACL> acl,
- CreateMode createMode) throws KeeperException, InterruptedException {
- return keeper.create(path, data, acl, createMode);
+ public String create(final String path, final byte data[], final List<ACL> acl,
+ final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public String execute() throws KeeperException, InterruptedException {
+ return keeper.create(path, data, acl, createMode);
+ }
+ });
+ } else {
+ return keeper.create(path, data, acl, createMode);
+ }
}
/**
* @param path
* @param watcher
+ * @param retryOnConnLoss
* @return children of the node at the path
* @throws KeeperException
* @throws InterruptedException
*/
- public List<String> getChildren(final String path, Watcher watcher)
+ public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- return keeper.getChildren(path, watcher);
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public List<String> execute() throws KeeperException, InterruptedException {
+ return keeper.getChildren(path, watcher);
+ }
+ });
+ } else {
+ return keeper.getChildren(path, watcher);
+ }
}
/**
* @param path
* @param watcher
* @param stat
+ * @param retryOnConnLoss
* @return node's data
* @throws KeeperException
* @throws InterruptedException
*/
- public byte[] getData(final String path, Watcher watcher, Stat stat)
+ public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- return keeper.getData(path, watcher, stat);
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public byte[] execute() throws KeeperException, InterruptedException {
+ return keeper.getData(path, watcher, stat);
+ }
+ });
+ } else {
+ return keeper.getData(path, watcher, stat);
+ }
}
/**
* @param path
* @param data
* @param version
+ * @param retryOnConnLoss
* @return node's state
* @throws KeeperException
* @throws InterruptedException
*/
- public Stat setData(final String path, byte data[], int version)
+ public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- return keeper.setData(path, data, version);
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ return keeper.setData(path, data, version);
+ }
+ });
+ } else {
+ return keeper.setData(path, data, version);
+ }
}
/**
@@ -250,14 +323,24 @@ public class SolrZkClient {
* @param data
* @param createMode
* @return path of created node
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public String create(String path, byte[] data, CreateMode createMode) throws KeeperException, InterruptedException {
-
- String zkPath = keeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
-
- return zkPath;
+ public String create(final String path, final byte[] data,
+ final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public String execute() throws KeeperException, InterruptedException {
+ return keeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ createMode);
+ }
+ });
+ } else {
+ return keeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
+ }
}
/**
@@ -267,33 +350,34 @@ public class SolrZkClient {
* group, node exist, each will be created.
*
* @param path
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public void makePath(String path) throws KeeperException,
+ public void makePath(String path, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
- makePath(path, null, CreateMode.PERSISTENT);
+ makePath(path, null, CreateMode.PERSISTENT, retryOnConnLoss);
}
- public void makePath(String path, boolean failOnExists) throws KeeperException,
+ public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
- makePath(path, null, CreateMode.PERSISTENT, null, failOnExists);
+ makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
}
- public void makePath(String path, File file, boolean failOnExists)
+ public void makePath(String path, File file, boolean failOnExists, boolean retryOnConnLoss)
throws IOException, KeeperException, InterruptedException {
makePath(path, FileUtils.readFileToString(file).getBytes("UTF-8"),
- CreateMode.PERSISTENT, null, failOnExists);
+ CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
}
- public void makePath(String path, File file) throws IOException,
+ public void makePath(String path, File file, boolean retryOnConnLoss) throws IOException,
KeeperException, InterruptedException {
- makePath(path, FileUtils.readFileToString(file).getBytes("UTF-8"));
+ makePath(path, FileUtils.readFileToString(file).getBytes("UTF-8"), retryOnConnLoss);
}
- public void makePath(String path, CreateMode createMode) throws KeeperException,
+ public void makePath(String path, CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
- makePath(path, null, createMode);
+ makePath(path, null, createMode, retryOnConnLoss);
}
/**
@@ -301,12 +385,13 @@ public class SolrZkClient {
*
* @param path
* @param data to set on the last zkNode
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public void makePath(String path, byte[] data) throws KeeperException,
+ public void makePath(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
- makePath(path, data, CreateMode.PERSISTENT);
+ makePath(path, data, CreateMode.PERSISTENT, retryOnConnLoss);
}
/**
@@ -318,12 +403,13 @@ public class SolrZkClient {
* @param path
* @param data to set on the last zkNode
* @param createMode
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public void makePath(String path, byte[] data, CreateMode createMode)
+ public void makePath(String path, byte[] data, CreateMode createMode, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- makePath(path, data, createMode, null);
+ makePath(path, data, createMode, null, retryOnConnLoss);
}
/**
@@ -336,17 +422,18 @@ public class SolrZkClient {
* @param data to set on the last zkNode
* @param createMode
* @param watcher
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
public void makePath(String path, byte[] data, CreateMode createMode,
- Watcher watcher) throws KeeperException, InterruptedException {
- makePath(path, data, createMode, watcher, true);
+ Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+ makePath(path, data, createMode, watcher, true, retryOnConnLoss);
}
+
/**
- *
* Creates the path in ZooKeeper, creating each node as necessary.
*
* e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
@@ -357,11 +444,12 @@ public class SolrZkClient {
* @param createMode
* @param watcher
* @param failOnExists
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
public void makePath(String path, byte[] data, CreateMode createMode,
- Watcher watcher, boolean failOnExists) throws KeeperException, InterruptedException {
+ Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
if (log.isInfoEnabled()) {
log.info("makePath: " + path);
}
@@ -375,8 +463,8 @@ public class SolrZkClient {
byte[] bytes = null;
String pathPiece = paths[i];
sbPath.append("/" + pathPiece);
- String currentPath = sbPath.toString();
- Object exists = exists(currentPath, watcher);
+ final String currentPath = sbPath.toString();
+ Object exists = exists(currentPath, watcher, retryOnConnLoss);
if (exists == null || ((i == paths.length -1) && failOnExists)) {
CreateMode mode = CreateMode.PERSISTENT;
if (i == paths.length - 1) {
@@ -384,7 +472,19 @@ public class SolrZkClient {
bytes = data;
}
try {
- keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+ if (retryOnConnLoss) {
+ final CreateMode finalMode = mode;
+ final byte[] finalBytes = bytes;
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ keeper.create(currentPath, finalBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, finalMode);
+ return null;
+ }
+ });
+ } else {
+ keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+ }
} catch (NodeExistsException e) {
// ignore unless it's the last node in the path
if (i == paths.length - 1) {
@@ -393,13 +493,13 @@ public class SolrZkClient {
}
if(i == paths.length -1) {
// set new watch
- exists(currentPath, watcher);
+ exists(currentPath, watcher, retryOnConnLoss);
}
} else if (i == paths.length - 1) {
// TODO: version ? for now, don't worry about race
- setData(currentPath, data, -1);
+ setData(currentPath, data, -1, retryOnConnLoss);
// set new watch
- exists(currentPath, watcher);
+ exists(currentPath, watcher, retryOnConnLoss);
}
}
}
@@ -408,12 +508,13 @@ public class SolrZkClient {
* @param zkPath
* @param createMode
* @param watcher
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public void makePath(String zkPath, CreateMode createMode, Watcher watcher)
+ public void makePath(String zkPath, CreateMode createMode, Watcher watcher, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
- makePath(zkPath, null, createMode, watcher);
+ makePath(zkPath, null, createMode, watcher, retryOnConnLoss);
}
/**
@@ -421,12 +522,13 @@ public class SolrZkClient {
*
* @param path
* @param data
+ * @param retryOnConnLoss
* @throws KeeperException
* @throws InterruptedException
*/
- public void setData(String path, byte[] data) throws KeeperException,
+ public void setData(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
InterruptedException {
- setData(path, data, -1);
+ setData(path, data, -1, retryOnConnLoss);
}
/**
@@ -434,18 +536,19 @@ public class SolrZkClient {
*
* @param path path to upload file to e.g. /solr/conf/solrconfig.xml
* @param file path to file to be uploaded
+ * @param retryOnConnLoss
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
- public void setData(String path, File file) throws IOException,
+ public void setData(String path, File file, boolean retryOnConnLoss) throws IOException,
KeeperException, InterruptedException {
if (log.isInfoEnabled()) {
log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
}
String data = FileUtils.readFileToString(file);
- setData(path, data.getBytes("UTF-8"));
+ setData(path, data.getBytes("UTF-8"), retryOnConnLoss);
}
/**
@@ -458,8 +561,8 @@ public class SolrZkClient {
*/
public void printLayout(String path, int indent, StringBuilder string)
throws KeeperException, InterruptedException {
- byte[] data = getData(path, null, null);
- List<String> children = getChildren(path, null);
+ byte[] data = getData(path, null, null, true);
+ List<String> children = getChildren(path, null, true);
StringBuilder dent = new StringBuilder();
for (int i = 0; i < indent; i++) {
dent.append(" ");
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Sat Jan 7 17:17:07 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.List;
import org.apache.log4j.Logger;
-import org.apache.solr.common.SolrException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -32,13 +31,13 @@ import org.apache.zookeeper.data.ACL;
public class ZkCmdExecutor {
private static final Logger LOG = Logger.getLogger(ZkCmdExecutor.class);
- protected final SolrZkClient zkClient;
+ //protected final SolrZkClient zkClient;
private long retryDelay = 1000L;
private int retryCount = 15;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- public ZkCmdExecutor(SolrZkClient solrZkClient) {
- this.zkClient = solrZkClient;
+ public ZkCmdExecutor() {
+ //this.zkClient = solrZkClient;
}
/**
@@ -102,20 +101,20 @@ public class ZkCmdExecutor {
throw exception;
}
- public void ensureExists(String path) {
- ensureExists(path, null, CreateMode.PERSISTENT);
+ public void ensureExists(String path, final SolrZkClient zkClient) {
+ ensureExists(path, null, CreateMode.PERSISTENT, zkClient);
}
public void ensureExists(final String path, final byte[] data,
- CreateMode createMode) {
+ CreateMode createMode, final SolrZkClient zkClient) {
try {
retryOperation(new ZkOperation() {
public Object execute() throws KeeperException, InterruptedException {
- if (zkClient.exists(path)) {
+ if (zkClient.exists(path, false)) {
return true;
}
try {
- zkClient.makePath(path, data);
+ zkClient.makePath(path, data, false);
} catch (NodeExistsException e) {
// its okay if another beats us creating the node
}
@@ -145,7 +144,4 @@ public class ZkCmdExecutor {
}
}
- public SolrZkClient getZkClient() {
- return zkClient;
- }
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Sat Jan 7 17:17:07 2012
@@ -36,11 +36,9 @@ import org.apache.noggit.JSONParser;
import org.apache.noggit.JSONWriter;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.common.SolrException;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,11 +118,10 @@ public class ZkStateReader {
private boolean closeClient = false;
- private ZkCmdExecutor cmdExecutor;
+ private ZkCmdExecutor cmdExecutor = new ZkCmdExecutor();
public ZkStateReader(SolrZkClient zkClient) {
this.zkClient = zkClient;
- cmdExecutor = new ZkCmdExecutor(zkClient);
}
public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
@@ -150,7 +147,6 @@ public class ZkStateReader {
}
});
- cmdExecutor = new ZkCmdExecutor(zkClient);
}
// load and publish a new CollectionInfo
@@ -168,40 +164,13 @@ public class ZkStateReader {
// We need to fetch the current cluster state and the set of live nodes
synchronized (getUpdateLock()) {
- Boolean exists = cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Boolean execute() throws KeeperException, InterruptedException {
- return zkClient.exists(CLUSTER_STATE);
- }
- });
-
- if (!exists) {
- try {
- cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- zkClient.create(CLUSTER_STATE, null, CreateMode.PERSISTENT);
- return null;
- }
- });
-
-
- } catch (NodeExistsException e) {
- // if someone beats us to creating this ignore it
- }
- }
- }
+ cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
+
log.info("Updating cluster state from ZooKeeper... ");
- cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- // TODO Auto-generated method stub
- return zkClient.exists(CLUSTER_STATE, new Watcher() {
+
+ zkClient.exists(CLUSTER_STATE, new Watcher() {
@Override
public void process(WatchedEvent event) {
@@ -213,13 +182,7 @@ public class ZkStateReader {
synchronized (ZkStateReader.this.getUpdateLock()) {
// remake watch
final Watcher thisWatch = this;
- byte[] data = cmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public byte[] execute() throws KeeperException,
- InterruptedException {
- return zkClient.getData(CLUSTER_STATE, thisWatch, null);
- }
- });
+ byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null, true);
CloudState clusterState = CloudState.load(data,
ZkStateReader.this.cloudState.getLiveNodes());
@@ -243,16 +206,12 @@ public class ZkStateReader {
}
}
- });
+ }, true);
}
- });
synchronized (ZkStateReader.this.getUpdateLock()) {
- List<String> liveNodes = cmdExecutor.retryOperation(new ZkOperation() {
- @Override
- public List<String> execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(LIVE_NODES_ZKNODE,
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
new Watcher() {
@Override
@@ -263,7 +222,7 @@ public class ZkStateReader {
// ZkStateReader.this.updateCloudState(false, true);
synchronized (ZkStateReader.this.getUpdateLock()) {
List<String> liveNodes = zkClient.getChildren(
- LIVE_NODES_ZKNODE, this);
+ LIVE_NODES_ZKNODE, this, true);
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
CloudState clusterState = new CloudState(liveNodesSet,
@@ -287,9 +246,8 @@ public class ZkStateReader {
}
}
- });
- }
- });
+ }, true);
+
Set<String> liveNodeSet = new HashSet<String>();
liveNodeSet.addAll(liveNodes);
CloudState clusterState = CloudState.load(zkClient, liveNodeSet);
@@ -308,13 +266,7 @@ public class ZkStateReader {
if (immediate) {
CloudState clusterState;
synchronized (getUpdateLock()) {
- List<String> liveNodes = cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(LIVE_NODES_ZKNODE, null);
- }
- });
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, true);
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
@@ -345,14 +297,8 @@ public class ZkStateReader {
cloudStateUpdateScheduled = false;
CloudState clusterState;
try {
- List<String> liveNodes = cmdExecutor.retryOperation(new ZkOperation() {
-
- @Override
- public Object execute() throws KeeperException, InterruptedException {
- return zkClient.getChildren(LIVE_NODES_ZKNODE,
- null);
- }
- });
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+ null, true);
Set<String> liveNodesSet = new HashSet<String>();
liveNodesSet.addAll(liveNodes);
Modified: lucene/dev/branches/solrcloud/solr/webapp/web/admin/zookeeper.jsp
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/webapp/web/admin/zookeeper.jsp?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/webapp/web/admin/zookeeper.jsp (original)
+++ lucene/dev/branches/solrcloud/solr/webapp/web/admin/zookeeper.jsp Sat Jan 7 17:17:07 2012
@@ -310,7 +310,7 @@
Stat stat = new Stat();
try {
- byte[] data = zkClient.getData(path, null, stat);
+ byte[] data = zkClient.getData(path, null, stat, true);
if (stat.getEphemeralOwner() != 0)
out.print("ephemeral ");
@@ -362,7 +362,7 @@
List<String> children = null;
try {
- children = zkClient.getChildren(path, null);
+ children = zkClient.getChildren(path, null, true);
} catch (KeeperException e) {
exception(e);
return;
@@ -389,7 +389,7 @@
try {
Stat stat = new Stat();
- byte[] data = zkClient.getData(path, null, stat);
+ byte[] data = zkClient.getData(path, null, stat, true);
out.print("<h2>");
xmlescape(path);
Modified: lucene/dev/branches/solrcloud/solr/webapp/web/zookeeper.jsp
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/webapp/web/zookeeper.jsp?rev=1228672&r1=1228671&r2=1228672&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/webapp/web/zookeeper.jsp (original)
+++ lucene/dev/branches/solrcloud/solr/webapp/web/zookeeper.jsp Sat Jan 7 17:17:07 2012
@@ -286,7 +286,7 @@ static class ZKPrinter
Stat stat = new Stat();
try
{
- byte[] data = zkClient.getData(path, null, stat);
+ byte[] data = zkClient.getData(path, null, stat, true);
if( stat.getEphemeralOwner() != 0 )
{
@@ -361,7 +361,7 @@ static class ZKPrinter
List<String> children = null;
try
{
- children = zkClient.getChildren(path, null);
+ children = zkClient.getChildren(path, null, true);
}
catch (KeeperException e)
{
@@ -407,7 +407,7 @@ static class ZKPrinter
try
{
Stat stat = new Stat();
- byte[] data = zkClient.getData(path, null, stat);
+ byte[] data = zkClient.getData(path, null, stat, true);
out.println("\"znode\" : {");