You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/06 22:36:07 UTC

svn commit: r1228419 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Fri Jan  6 21:36:07 2012
New Revision: 1228419

URL: http://svn.apache.org/viewvc?rev=1228419&view=rev
Log:
more connection loss hardening

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Jan  6 21:36:07 2012
@@ -271,18 +271,7 @@ public  class LeaderElector {
     String electZKPath = context.electionPath
         + LeaderElector.ELECTION_NODE;
     
-    try {
-      
-      // leader election node
-      if (!zkClient.exists(electZKPath)) { // on connection loss we throw out an exception
-        
-        // make new leader election node
-        zkClient.makePath(electZKPath, CreateMode.PERSISTENT, null);
-        
-      }
-    } catch (NodeExistsException e) {
-      // its okay if another beats us creating the node
-    }
+      cmdExecutor.ensureExists(electZKPath);
   }
   
   /**

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Jan  6 21:36:07 2012
@@ -34,14 +34,14 @@ import org.apache.solr.common.cloud.Clou
 import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkOperation;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -64,10 +64,12 @@ public class Overseer implements NodeSta
 
   // shard leader watchers  (collection->slice->watcher)
   private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
+  private ZkCmdExecutor zkCmdExecutor;
 
   public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
     log.info("Constructing new Overseer");
     this.zkClient = zkClient;
+    this.zkCmdExecutor = new ZkCmdExecutor(zkClient);
     this.reader = reader;
     createWatches();
   }
@@ -84,32 +86,31 @@ public class Overseer implements NodeSta
   private void addCollectionsWatch() throws KeeperException,
       InterruptedException {
     
-    if(!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
-      try {
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE);
-      } catch (NodeExistsException ne) {
-        //ok
-      }
-    }
+    zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
     
-    List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+    List<String> collections = zkCmdExecutor.retryOperation(new ZkOperation() {
       @Override
-      public void process(WatchedEvent event) {
-        try {
-          List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this);
-          collectionsChanged(collections);
-        } catch (KeeperException e) {
-            if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
+      public Object execute() throws KeeperException, InterruptedException {
+        return zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+          @Override
+          public void process(WatchedEvent event) {
+            try {
+              List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this);
+              collectionsChanged(collections);
+            } catch (KeeperException e) {
+                if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
+                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                return;
+              }
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
           }
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.error("", e);
-          throw new ZooKeeperException(
-              SolrException.ErrorCode.SERVER_ERROR, "", e);
-        }
+        });
       }
     });
     
@@ -137,42 +138,42 @@ public class Overseer implements NodeSta
   private void addShardLeadersWatch(final String collection) throws KeeperException,
       InterruptedException {
     
-    if(!zkClient.exists(ZkStateReader.getShardLeadersPath(collection, null))) {
-      try {
-        zkClient.makePath(ZkStateReader.getShardLeadersPath(collection, null));
-      } catch (NodeExistsException ne) {
-        //ok if someone created it
-      }
-    }
+    zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null));
     
-    final List<String> leaderNodes = zkClient.getChildren(
-        ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
-          
-          @Override
-          public void process(WatchedEvent event) {
-            try {
-              List<String> leaderNodes = zkClient.getChildren(
-                  ZkStateReader.getShardLeadersPath(collection, null), this);
+    final List<String> leaderNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+      
+      @Override
+      public Object execute() throws KeeperException, InterruptedException {
+        return zkClient.getChildren(
+            ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
               
-              processLeaderNodesChanged(collection, leaderNodes);
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                return;
+              @Override
+              public void process(WatchedEvent event) {
+                try {
+                  List<String> leaderNodes = zkClient.getChildren(
+                      ZkStateReader.getShardLeadersPath(collection, null), this);
+                  
+                  processLeaderNodesChanged(collection, leaderNodes);
+                } catch (KeeperException e) {
+                  if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                      || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                    log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                    return;
+                  }
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                } catch (InterruptedException e) {
+                  // Restore the interrupted status
+                  Thread.currentThread().interrupt();
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                }
               }
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            }
-          }
-        });
+            });
+      }
+    });
     
     processLeaderNodesChanged(collection, leaderNodes);
   }
@@ -217,35 +218,41 @@ public class Overseer implements NodeSta
 
   private void addLiveNodesWatch() throws KeeperException,
       InterruptedException {
-    List<String> liveNodes = zkClient.getChildren(
-        ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
-          
-          @Override
-          public void process(WatchedEvent event) {
-            try {
-                List<String> liveNodes = zkClient.getChildren(
-                    ZkStateReader.LIVE_NODES_ZKNODE, this);
-                Set<String> liveNodesSet = new HashSet<String>();
-                liveNodesSet.addAll(liveNodes);
-                processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                return;
+    List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+      
+      @Override
+      public Object execute() throws KeeperException, InterruptedException {
+        return zkClient.getChildren(
+            ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
+              
+              @Override
+              public void process(WatchedEvent event) {
+                try {
+                    List<String> liveNodes = zkClient.getChildren(
+                        ZkStateReader.LIVE_NODES_ZKNODE, this);
+                    Set<String> liveNodesSet = new HashSet<String>();
+                    liveNodesSet.addAll(liveNodes);
+                    processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
+                } catch (KeeperException e) {
+                  if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                      || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                    log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                    return;
+                  }
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                } catch (InterruptedException e) {
+                  // Restore the interrupted status
+                  Thread.currentThread().interrupt();
+                  log.error("", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                }
               }
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            }
-          }
-        });
+            });
+      }
+    });
     
     processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
   }
@@ -270,13 +277,7 @@ public class Overseer implements NodeSta
       final String path = STATES_NODE + "/" + nodeName;
       synchronized (nodeStateWatches) {
         if (!nodeStateWatches.containsKey(nodeName)) {
-          try {
-            if (!zkClient.exists(path)) {
-              zkClient.makePath(path);
-            }
-          } catch (NodeExistsException e) {
-            // thats okay...
-          }
+          zkCmdExecutor.ensureExists(path);
           nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
         } else {
           log.debug("watch already added");
@@ -337,10 +338,17 @@ public class Overseer implements NodeSta
       for (CoreState state : states) {
         cloudState = updateState(cloudState, nodeName, state);
       }
-      
+      final CloudState finalState = cloudState;
       try {
-        zkClient.setData(ZkStateReader.CLUSTER_STATE,
-            ZkStateReader.toJSON(cloudState));
+        zkCmdExecutor.retryOperation(new ZkOperation() {
+          
+          @Override
+          public Object execute() throws KeeperException, InterruptedException {
+            zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                ZkStateReader.toJSON(finalState));            
+            return null;
+          }
+        });
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -355,14 +363,8 @@ public class Overseer implements NodeSta
       log.info("creating node:" + node);
     }
     
-    try {
-      if (!zkClient.exists(node)) {
-        zkClient.makePath(node, CreateMode.PERSISTENT, null);
-      }
-      
-    } catch (NodeExistsException e) {
-      // it's ok
-    }
+    ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(zkClient);
+    zkCmdExecutor.ensureExists(node);
   }
   
   private CloudState updateSlice(CloudState state, String collection, Slice slice) {
@@ -453,13 +455,21 @@ public class Overseer implements NodeSta
       try {
         reader.updateCloudState(true); // get fresh copy of the state
       final CloudState state = reader.getCloudState();
-      CloudState newState = setShardLeader(state, collection, shardId,
+      final CloudState newState = setShardLeader(state, collection, shardId,
           props.getCoreUrl());
         if (state != newState) { // if same instance was returned no need to
                                  // update state
           log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
-          zkClient.setData(ZkStateReader.CLUSTER_STATE,
-              ZkStateReader.toJSON(newState));
+          zkCmdExecutor.retryOperation(new ZkOperation() {
+            
+            @Override
+            public Object execute() throws KeeperException, InterruptedException {
+              zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                  ZkStateReader.toJSON(newState));
+              return null;
+            }
+          });
+          
         } else {
           log.debug("State was not changed.");
         }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jan  6 21:36:07 2012
@@ -291,20 +291,11 @@ public final class ZkController {
       }
       
       // makes nodes zkNode
-      try {
-        zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
-      } catch (KeeperException e) {
-        // its okay if another beats us creating the node
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-      }
+      cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE);
       
       Overseer.createClientNodes(zkClient, getNodeName());
       createEphemeralLiveNode();
-      setUpCollectionsNode();
+      cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE);
 
       overseerElector = new LeaderElector(zkClient);
       ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
@@ -633,20 +624,6 @@ public final class ZkController {
     zkClient.printLayoutToStdOut();
   }
 
-  private void setUpCollectionsNode() throws KeeperException, InterruptedException {
-    try {
-      if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
-        }
-        // makes collections zkNode if it doesn't exist
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
-      }
-    } catch (NodeExistsException e) {
-      // its okay if another beats us creating the node
-    }
-  }
-
   public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
     String collection = cd.getCollectionName();
     

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java?rev=1228419&r1=1228418&r2=1228419&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java Fri Jan  6 21:36:07 2012
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.log4j.Logger;
+import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 
@@ -100,34 +102,25 @@ public class ZkCmdExecutor {
     throw exception;
   }
   
-  /**
-   * Ensures that the given path exists with no data, the current ACL and no
-   * flags
-   * 
-   * @param path
-   * @throws IOException 
-   */
-  protected void ensurePathExists(String path) {
-    ensureExists(path, null, acl, CreateMode.PERSISTENT);
+  public void ensureExists(String path) {
+    ensureExists(path, null, CreateMode.PERSISTENT);
   }
   
-  /**
-   * Ensures that the given path exists with the given data, ACL and flags
-   * 
-   * @param path
-   * @param acl
-   * @param flags
-   * @throws IOException 
-   */
-  protected void ensureExists(final String path, final byte[] data,
-      final List<ACL> acl, final CreateMode flags) {
+  public void ensureExists(final String path, final byte[] data,
+      CreateMode createMode) {
     try {
       retryOperation(new ZkOperation() {
         public Object execute() throws KeeperException, InterruptedException {
           if (zkClient.exists(path)) {
             return true;
           }
-          zkClient.create(path, data, acl, flags);
+          try {
+            zkClient.makePath(path, data);
+          } catch (NodeExistsException e) {
+            // its okay if another beats us creating the node
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          }
           return true;
         }
       });