You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/06 22:36:07 UTC
svn commit: r1228419 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Fri Jan 6 21:36:07 2012
New Revision: 1228419
URL: http://svn.apache.org/viewvc?rev=1228419&view=rev
Log:
more connection loss hardening
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Jan 6 21:36:07 2012
@@ -271,18 +271,7 @@ public class LeaderElector {
String electZKPath = context.electionPath
+ LeaderElector.ELECTION_NODE;
- try {
-
- // leader election node
- if (!zkClient.exists(electZKPath)) { // on connection loss we throw out an exception
-
- // make new leader election node
- zkClient.makePath(electZKPath, CreateMode.PERSISTENT, null);
-
- }
- } catch (NodeExistsException e) {
- // its okay if another beats us creating the node
- }
+ cmdExecutor.ensureExists(electZKPath);
}
/**
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Jan 6 21:36:07 2012
@@ -34,14 +34,14 @@ import org.apache.solr.common.cloud.Clou
import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkOperation;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
@@ -64,10 +64,12 @@ public class Overseer implements NodeSta
// shard leader watchers (collection->slice->watcher)
private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
+ private ZkCmdExecutor zkCmdExecutor;
public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
log.info("Constructing new Overseer");
this.zkClient = zkClient;
+ this.zkCmdExecutor = new ZkCmdExecutor(zkClient);
this.reader = reader;
createWatches();
}
@@ -84,32 +86,31 @@ public class Overseer implements NodeSta
private void addCollectionsWatch() throws KeeperException,
InterruptedException {
- if(!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
- try {
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE);
- } catch (NodeExistsException ne) {
- //ok
- }
- }
+ zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
- List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+ List<String> collections = zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
- public void process(WatchedEvent event) {
- try {
- List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this);
- collectionsChanged(collections);
- } catch (KeeperException e) {
- if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
+ public Object execute() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this);
+ collectionsChanged(collections);
+ } catch (KeeperException e) {
+ if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
}
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
+ });
}
});
@@ -137,42 +138,42 @@ public class Overseer implements NodeSta
private void addShardLeadersWatch(final String collection) throws KeeperException,
InterruptedException {
- if(!zkClient.exists(ZkStateReader.getShardLeadersPath(collection, null))) {
- try {
- zkClient.makePath(ZkStateReader.getShardLeadersPath(collection, null));
- } catch (NodeExistsException ne) {
- //ok if someone created it
- }
- }
+ zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null));
- final List<String> leaderNodes = zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> leaderNodes = zkClient.getChildren(
- ZkStateReader.getShardLeadersPath(collection, null), this);
+ final List<String> leaderNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
- processLeaderNodesChanged(collection, leaderNodes);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> leaderNodes = zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), this);
+
+ processLeaderNodesChanged(collection, leaderNodes);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
}
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- }
- });
+ });
+ }
+ });
processLeaderNodesChanged(collection, leaderNodes);
}
@@ -217,35 +218,41 @@ public class Overseer implements NodeSta
private void addLiveNodesWatch() throws KeeperException,
InterruptedException {
- List<String> liveNodes = zkClient.getChildren(
- ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- try {
- List<String> liveNodes = zkClient.getChildren(
- ZkStateReader.LIVE_NODES_ZKNODE, this);
- Set<String> liveNodesSet = new HashSet<String>();
- liveNodesSet.addAll(liveNodes);
- processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
+ List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(
+ ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> liveNodes = zkClient.getChildren(
+ ZkStateReader.LIVE_NODES_ZKNODE, this);
+ Set<String> liveNodesSet = new HashSet<String>();
+ liveNodesSet.addAll(liveNodes);
+ processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
}
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- }
- });
+ });
+ }
+ });
processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
}
@@ -270,13 +277,7 @@ public class Overseer implements NodeSta
final String path = STATES_NODE + "/" + nodeName;
synchronized (nodeStateWatches) {
if (!nodeStateWatches.containsKey(nodeName)) {
- try {
- if (!zkClient.exists(path)) {
- zkClient.makePath(path);
- }
- } catch (NodeExistsException e) {
- // thats okay...
- }
+ zkCmdExecutor.ensureExists(path);
nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
} else {
log.debug("watch already added");
@@ -337,10 +338,17 @@ public class Overseer implements NodeSta
for (CoreState state : states) {
cloudState = updateState(cloudState, nodeName, state);
}
-
+ final CloudState finalState = cloudState;
try {
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(cloudState));
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(finalState));
+ return null;
+ }
+ });
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -355,14 +363,8 @@ public class Overseer implements NodeSta
log.info("creating node:" + node);
}
- try {
- if (!zkClient.exists(node)) {
- zkClient.makePath(node, CreateMode.PERSISTENT, null);
- }
-
- } catch (NodeExistsException e) {
- // it's ok
- }
+ ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(zkClient);
+ zkCmdExecutor.ensureExists(node);
}
private CloudState updateSlice(CloudState state, String collection, Slice slice) {
@@ -453,13 +455,21 @@ public class Overseer implements NodeSta
try {
reader.updateCloudState(true); // get fresh copy of the state
final CloudState state = reader.getCloudState();
- CloudState newState = setShardLeader(state, collection, shardId,
+ final CloudState newState = setShardLeader(state, collection, shardId,
props.getCoreUrl());
if (state != newState) { // if same instance was returned no need to
// update state
log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
- zkClient.setData(ZkStateReader.CLUSTER_STATE,
- ZkStateReader.toJSON(newState));
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(newState));
+ return null;
+ }
+ });
+
} else {
log.debug("State was not changed.");
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jan 6 21:36:07 2012
@@ -291,20 +291,11 @@ public final class ZkController {
}
// makes nodes zkNode
- try {
- zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
+ cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE);
Overseer.createClientNodes(zkClient, getNodeName());
createEphemeralLiveNode();
- setUpCollectionsNode();
+ cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
overseerElector = new LeaderElector(zkClient);
ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
@@ -633,20 +624,6 @@ public final class ZkController {
zkClient.printLayoutToStdOut();
}
- private void setUpCollectionsNode() throws KeeperException, InterruptedException {
- try {
- if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
- if (log.isInfoEnabled()) {
- log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
- }
- // makes collections zkNode if it doesn't exist
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
- }
- } catch (NodeExistsException e) {
- // its okay if another beats us creating the node
- }
- }
-
public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
String collection = cd.getCollectionName();
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Fri Jan 6 21:36:07 2012
@@ -21,8 +21,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.log4j.Logger;
+import org.apache.solr.common.SolrException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -100,34 +102,25 @@ public class ZkCmdExecutor {
throw exception;
}
- /**
- * Ensures that the given path exists with no data, the current ACL and no
- * flags
- *
- * @param path
- * @throws IOException
- */
- protected void ensurePathExists(String path) {
- ensureExists(path, null, acl, CreateMode.PERSISTENT);
+ public void ensureExists(String path) {
+ ensureExists(path, null, CreateMode.PERSISTENT);
}
- /**
- * Ensures that the given path exists with the given data, ACL and flags
- *
- * @param path
- * @param acl
- * @param flags
- * @throws IOException
- */
- protected void ensureExists(final String path, final byte[] data,
- final List<ACL> acl, final CreateMode flags) {
+ public void ensureExists(final String path, final byte[] data,
+ CreateMode createMode) {
try {
retryOperation(new ZkOperation() {
public Object execute() throws KeeperException, InterruptedException {
if (zkClient.exists(path)) {
return true;
}
- zkClient.create(path, data, acl, flags);
+ try {
+ zkClient.makePath(path, data);
+ } catch (NodeExistsException e) {
+ // its okay if another beats us creating the node
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
return true;
}
});