You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/30 18:35:25 UTC

svn commit: r1379008 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/java/org/apache/solr/update...

Author: markrmiller
Date: Thu Aug 30 16:35:24 2012
New Revision: 1379008

URL: http://svn.apache.org/viewvc?rev=1379008&view=rev
Log:
SOLR-3750: On session expiration, we should explicitly wait some time before running the leader sync process so that we are sure every node participates.
  
SOLR-3772: On cluster startup, we should wait until we see all registered replicas before running the leader process or if they all do not come up, N amount of time. 

SOLR-3752: When a leader goes down, have the Overseer clear the leader state in cluster.json 

SOLR-3751: Add defensive checks for SolrCloud updates and requests that ensure the local state matches what we can tell the request expected.

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
    lucene/dev/branches/branch_4x/solr/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
    lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Thu Aug 30 16:35:24 2012
@@ -81,6 +81,14 @@ Bug Fixes
 * SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at
   the same time if the recovery thread join call was interrupted.
   (Per Steffensen, Mark Miller)
+  
+* SOLR-3750: On session expiration, we should explicitly wait some time before 
+  running the leader sync process so that we are sure every node participates.
+  (Per Steffensen, Mark Miller)
+  
+* SOLR-3772: On cluster startup, we should wait until we see all registered 
+  replicas before running the leader process - or if they all do not come up, 
+  N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
 
 Other Changes
 ----------------------
@@ -94,6 +102,12 @@ Other Changes
 * SOLR-2747: Updated changes2html.pl to handle Solr's CHANGES.txt; added
   target 'changes-to-html' to solr/build.xml.
   (Steve Rowe, Robert Muir)
+  
+* SOLR-3752: When a leader goes down, have the Overseer clear the leader state
+  in cluster.json (Mark Miller)
+
+* SOLR-3751: Add defensive checks for SolrCloud updates and requests that ensure 
+  the local state matches what we can tell the request expected. (Mark Miller)
 
 ==================  4.0.0-BETA ===================
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Thu Aug 30 16:35:24 2012
@@ -26,6 +26,12 @@ public class CloudDescriptor {
   private String roles = null;
   private Integer numShards;
   
+  volatile boolean isLeader = false;
+  
+  public boolean isLeader() {
+    return isLeader;
+  }
+
   public void setShardId(String shardId) {
     this.shardId = shardId;
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Thu Aug 30 16:35:24 2012
@@ -2,6 +2,8 @@ package org.apache.solr.cloud;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -15,7 +17,6 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.core.SolrCore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,31 +83,26 @@ class ShardLeaderElectionContextBase ext
   }
 
   @Override
-  void runLeaderProcess(boolean weAreReplacement)
-      throws KeeperException, InterruptedException, IOException {
-
-    try {
-      zkClient.makePath(leaderPath,
-          leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
-          CreateMode.EPHEMERAL, true);
-    } catch (NodeExistsException e) {
-      // if a previous leader ephemeral still exists for some reason, try and
-      // remove it
-      zkClient.delete(leaderPath, -1, true);
-      zkClient.makePath(leaderPath,
-          leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
-          CreateMode.EPHEMERAL, true);
-    }
+  void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+      InterruptedException, IOException {
+    // this pause is important (and seems to work also at 100ms to 1 second in
+    // many cases),
+    // but I don't know why yet :*( - it must come before this publish call
+    // and can happen at the start of leader election process even
+    Thread.sleep(500);
+    
+    zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
+        CreateMode.EPHEMERAL, true);
     
-    // TODO: above we make it looks like leaderProps could be true, but here
-    // you would get an NPE if it was.
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        "leader", ZkStateReader.SHARD_ID_PROP, shardId,
-        ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
-        leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
-        ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP));
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "leader",
+        ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+        collection, ZkStateReader.BASE_URL_PROP, leaderProps.getProperties()
+            .get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+        leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
+        ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
     Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
-  } 
+    
+  }
 
 }
 
@@ -117,66 +113,182 @@ final class ShardLeaderElectionContext e
   private ZkController zkController;
   private CoreContainer cc;
   private SyncStrategy syncStrategy = new SyncStrategy();
+
+  private boolean afterExpiration;
   
   public ShardLeaderElectionContext(LeaderElector leaderElector, 
       final String shardId, final String collection,
-      final String shardZkNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
+      final String shardZkNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc, boolean afterExpiration) {
     super(leaderElector, shardId, collection, shardZkNodeName, props,
         zkController.getZkStateReader());
     this.zkController = zkController;
     this.cc = cc;
+    this.afterExpiration = afterExpiration;
   }
   
   @Override
-  void runLeaderProcess(boolean weAreReplacement)
-      throws KeeperException, InterruptedException, IOException {
-    if (cc != null) {
-      String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
-      SolrCore core = null;
+  void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+      InterruptedException, IOException {
+    String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
+    
+    // clear the leader in clusterstate
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "leader",
+        ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP,
+        collection);
+    Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
+    
+    waitForReplicasToComeUp(weAreReplacement);
+    
+    // wait for local leader state to clear...
+    // int tries = 0;
+    // while (zkController.getClusterState().getLeader(collection, shardId) !=
+    // null) {
+    // System.out.println("leader still shown " + tries + " " +
+    // zkController.getClusterState().getLeader(collection, shardId));
+    // Thread.sleep(1000);
+    // tries++;
+    // if (tries == 30) {
+    // break;
+    // }
+    // }
+    // Thread.sleep(1000);
+    
+    SolrCore core = null;
+    try {
+      
+      core = cc.getCore(coreName);
+      
+      if (core == null) {
+        cancelElection();
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Fatal Error, SolrCore not found:" + coreName + " in "
+                + cc.getCoreNames());
+      }
+      
+      // should I be leader?
+      if (weAreReplacement && !shouldIBeLeader(leaderProps, core)) {
+        // System.out.println("there is a better leader candidate it appears");
+        rejoinLeaderElection(leaderSeqPath, core);
+        return;
+      }
+      
+      if (weAreReplacement) {
+        log.info("I may be the new leader - try and sync");
+        // we are going to attempt to be the leader
+        // first cancel any current recovery
+        core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+        boolean success = syncStrategy.sync(zkController, core, leaderProps);
+        // solrcloud_debug
+        // try {
+        // RefCounted<SolrIndexSearcher> searchHolder =
+        // core.getNewestSearcher(false);
+        // SolrIndexSearcher searcher = searchHolder.get();
+        // try {
+        // System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+        // + " synched "
+        // + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+        // } finally {
+        // searchHolder.decref();
+        // }
+        // } catch (Exception e) {
+        //
+        // }
+        if (!success && anyoneElseActive()) {
+          rejoinLeaderElection(leaderSeqPath, core);
+          return;
+        }
+      }
+      
+      log.info("I am the new leader: "
+          + ZkCoreNodeProps.getCoreUrl(leaderProps));
+      
+    } finally {
+      if (core != null) {
+        core.close();
+      }
+    }
+    
+    try {
+      super.runLeaderProcess(weAreReplacement);
+    } catch (Throwable t) {
+      cancelElection();
       try {
-     
         core = cc.getCore(coreName);
-
-        if (core == null) {
-          cancelElection();
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Fatal Error, SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+        core.getCoreDescriptor().getCloudDescriptor().isLeader = false;
+        if (!cc.isShutDown()) {
+          // we could not publish ourselves as leader - rejoin election
+          rejoinLeaderElection(coreName, core);
         }
-        // should I be leader?
-        if (weAreReplacement && !shouldIBeLeader(leaderProps)) {
-          // System.out.println("there is a better leader candidate it appears");
-          rejoinLeaderElection(leaderSeqPath, core);
-          return;
+      } finally {
+        if (core != null) {
+          core.close();
         }
+      }
+      
+    }
+    
+    try {
+      core = cc.getCore(coreName);
+      // we do this after the above super. call so that we don't
+      // briefly think we are the leader and then end up not being
+      // able to publish that we are the leader.
+      core.getCoreDescriptor().getCloudDescriptor().isLeader = true;
+    } finally {
+      if (core != null) {
+        core.close();
+      }
+    }
+    
+  }
 
-        if (weAreReplacement) {
-          if (zkClient.exists(leaderPath, true)) {
-            zkClient.delete(leaderPath, -1, true);
+  private void waitForReplicasToComeUp(boolean weAreReplacement)
+      throws InterruptedException {
+    int retries = 300; // ~ 5 min
+    boolean tryAgain = true;
+    Slice slices = zkController.getClusterState().getSlice(collection, shardId);
+    log.info("Running the leader process. afterExperiation=" + afterExpiration);
+    while (tryAgain || slices == null) {
+      
+      // wait for everyone to be up
+      if (slices != null) {
+        Map<String,ZkNodeProps> shards = slices.getShards();
+        Set<Entry<String,ZkNodeProps>> entrySet = shards.entrySet();
+        int found = 0;
+        tryAgain = false;
+        for (Entry<String,ZkNodeProps> entry : entrySet) {
+          ZkCoreNodeProps props = new ZkCoreNodeProps(entry.getValue());
+          if (props.getState().equals(ZkStateReader.ACTIVE)
+              && zkController.getClusterState().liveNodesContain(
+                  props.getNodeName())) {
+            found++;
           }
-          log.info("I may be the new leader - try and sync");
-          // we are going to attempt to be the leader
-          // first cancel any current recovery
-          core.getUpdateHandler().getSolrCoreState().cancelRecovery();
-          boolean success = syncStrategy.sync(zkController, core, leaderProps);
-          if (!success && anyoneElseActive()) {
-            rejoinLeaderElection(leaderSeqPath, core);
-            return;
-          } 
         }
-        log.info("I am the new leader: " + ZkCoreNodeProps.getCoreUrl(leaderProps));
         
-        // If I am going to be the leader I have to be active
-        core.getUpdateHandler().getSolrCoreState().cancelRecovery();
-        zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+        // on startup and after connection timeout, wait for all known shards
+        if ((afterExpiration || !weAreReplacement)
+            && found >= slices.getShards().size()) {
+          log.info("Enough replicas found to continue.");
+          tryAgain = false;
+        } else if (!afterExpiration && found >= slices.getShards().size() - 1) {
+          // a previous leader went down - wait for one less than the total
+          // known shards
+          log.info("Enough replicas found to continue.");
+          tryAgain = false;
+        } else {
+          log.info("Waiting until we see more replicas up");
+        }
         
-      } finally {
-        if (core != null ) {
-          core.close();
+        retries--;
+        if (retries == 0) {
+          log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
+          break;
         }
       }
-      
+      if (tryAgain) {
+        Thread.sleep(1000);
+        slices = zkController.getClusterState().getSlice(collection, shardId);
+      }
     }
-    
-    super.runLeaderProcess(weAreReplacement);
   }
 
   private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
@@ -195,7 +307,8 @@ final class ShardLeaderElectionContext e
     leaderElector.joinElection(this);
   }
   
-  private boolean shouldIBeLeader(ZkNodeProps leaderProps) {
+  private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
+    log.info("Checking if I should try and be the leader.");
     ClusterState clusterState = zkController.getZkStateReader().getClusterState();
     Map<String,Slice> slices = clusterState.getSlices(this.collection);
     Slice slice = slices.get(shardId);
@@ -210,6 +323,7 @@ final class ShardLeaderElectionContext e
           && clusterState.liveNodesContain(shard.getValue().get(
               ZkStateReader.NODE_NAME_PROP))) {
           // we are alive
+          log.info("I am Active and live, it's okay to be the leader.");
           return true;
         }
       }
@@ -222,7 +336,19 @@ final class ShardLeaderElectionContext e
         foundSomeoneElseActive = true;
       }
     }
-    
+    if (!foundSomeoneElseActive) {
+      log.info("I am not Active but no one else is either, it's okay to be the leader");
+      try {
+        zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+      } catch (KeeperException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    } else {
+      log.info("I am not Active and someone else appears to be a better leader candidate.");
+    }
     return !foundSomeoneElseActive;
   }
   
@@ -261,24 +387,16 @@ final class OverseerElectionContext exte
   }
 
   @Override
-  void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
+  void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
+      InterruptedException {
     
-    final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf("/")+1);
+    final String id = leaderSeqPath
+        .substring(leaderSeqPath.lastIndexOf("/") + 1);
     ZkNodeProps myProps = new ZkNodeProps("id", id);
-
-    try {
-      zkClient.makePath(leaderPath,
-          ZkStateReader.toJSON(myProps),
-          CreateMode.EPHEMERAL, true);
-    } catch (NodeExistsException e) {
-      // if a previous leader ephemeral still exists for some reason, try and
-      // remove it
-      zkClient.delete(leaderPath, -1, true);
-      zkClient.makePath(leaderPath,
-          ZkStateReader.toJSON(myProps),
-          CreateMode.EPHEMERAL, true);
-    }
-  
+    
+    zkClient.makePath(leaderPath, ZkStateReader.toJSON(myProps),
+        CreateMode.EPHEMERAL, true);
+    
     overseer.start(id);
   }
   

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Thu Aug 30 16:35:24 2012
@@ -93,6 +93,13 @@ public  class LeaderElector {
     sortSeqs(seqs);
     List<Integer> intSeqs = getSeqs(seqs);
     if (seq <= intSeqs.get(0)) {
+      // first we delete the node advertising the old leader in case the ephem is still there
+      try {
+        zkClient.delete(context.leaderPath, -1, true);
+      } catch(Exception e) {
+        // fine
+      }
+
       runIamLeaderProcess(context, replacement);
     } else {
       // I am not the leader - watch the node below me
@@ -138,6 +145,7 @@ public  class LeaderElector {
       } catch (KeeperException.SessionExpiredException e) {
         throw e;
       } catch (KeeperException e) {
+        SolrException.log(log, "Failed setting watch", e);
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
         checkIfIamLeader(seq, context, true);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 30 16:35:24 2012
@@ -166,12 +166,19 @@ public class Overseer {
       } else if (DELETECORE.equals(operation)) {
         clusterState = removeCore(clusterState, message);
       } else if (ZkStateReader.LEADER_PROP.equals(operation)) {
+
+        StringBuilder sb = new StringBuilder();
         String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
         String coreName = message.get(ZkStateReader.CORE_NAME_PROP);
-        final String leaderUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+        sb.append(baseUrl);
+        if (baseUrl != null && !baseUrl.endsWith("/")) sb.append("/");
+        sb.append(coreName == null ? "" : coreName);
+        if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
         clusterState = setShardLeader(clusterState,
             message.get(ZkStateReader.COLLECTION_PROP),
-            message.get(ZkStateReader.SHARD_ID_PROP), leaderUrl);
+            message.get(ZkStateReader.SHARD_ID_PROP),
+            sb.length() > 0 ? sb.toString() : null);
+
       } else {
         throw new RuntimeException("unknown operation:" + operation
             + " contents:" + message.getProperties());

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Thu Aug 30 16:35:24 2012
@@ -185,6 +185,7 @@ public class RecoveryStrategy extends Th
     prepCmd.setCoreNodeName(coreZkNodeName);
     prepCmd.setState(ZkStateReader.RECOVERING);
     prepCmd.setCheckLive(true);
+    prepCmd.setOnlyIfLeader(true);
     prepCmd.setPauseFor(6000);
     
     server.request(prepCmd);
@@ -239,6 +240,7 @@ public class RecoveryStrategy extends Th
       return;
     }
 
+    boolean firstTime = true;
 
     List<Long> recentVersions;
     UpdateLog.RecentUpdates recentUpdates = null;
@@ -273,9 +275,6 @@ public class RecoveryStrategy extends Th
       log.info("###### startupVersions=" + startingVersions);
     }
 
-
-    boolean firstTime = true;
-
     if (recoveringAfterStartup) {
       // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
       // when we went down.  We may have received updates since then.
@@ -305,7 +304,10 @@ public class RecoveryStrategy extends Th
         String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
 
         boolean isLeader = leaderUrl.equals(ourUrl);
-        if (isLeader) {
+        if (isLeader && !cloudDesc.isLeader) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+        }
+        if (cloudDesc.isLeader) {
           // we are now the leader - no one else must have been suitable
           log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
           log.info("Finished recovery process. core=" + coreName);
@@ -333,9 +335,6 @@ public class RecoveryStrategy extends Th
                 new ModifiableSolrParams());
             core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
             log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName);
-            // System.out
-            // .println("Sync Recovery was successful - registering as Active "
-            // + zkController.getNodeName());
 
             // solrcloud_debug
             // try {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Thu Aug 30 16:35:24 2012
@@ -108,7 +108,8 @@ public class SyncStrategy {
       if (!success
           && !areAnyOtherReplicasActive(zkController, leaderProps, collection,
               shardId)) {
-        log.info("Sync was not a success but no on else i active! I am the leader");
+        log.info("Sync was not a success but no one else is active! I am the leader");
+        zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
         success = true;
       }
       
@@ -224,14 +225,14 @@ public class SyncStrategy {
            
            requestRecovery(((ShardCoreRequest)srsp.getShardRequest()).baseUrl, ((ShardCoreRequest)srsp.getShardRequest()).coreName);
 
-         } catch (Exception e) {
-           SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", e);
+         } catch (Throwable t) {
+           SolrException.log(log, ZkCoreNodeProps.getCoreUrl(leaderProps) + ": Could not tell a replica to recover", t);
          }
       } else {
         log.info(ZkCoreNodeProps.getCoreUrl(leaderProps) + ": " + " sync completed with " + srsp.getShardAddress());
       }
+      
     }
-    
 
   }
   

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu Aug 30 16:35:24 2012
@@ -41,6 +41,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -112,8 +113,12 @@ public final class ZkController {
   private final String nodeName;           // example: 127.0.0.1:54065_solr
   private final String baseURL;            // example: http://127.0.0.1:54065/solr
 
+
   private LeaderElector overseerElector;
+  
 
+  // for now, this can be null in tests, in which case recovery will be inactive, and other features
+  // may accept defaults or use mocks rather than pulling things from a CoreContainer
   private CoreContainer cc;
 
   protected volatile Overseer overseer;
@@ -181,7 +186,11 @@ public final class ZkController {
                   // TODO: we need to think carefully about what happens when it was
                   // a leader that was expired - as well as what to do about leaders/overseers
                   // with connection loss
-                  register(descriptor.getName(), descriptor, true);
+                  try {
+                    register(descriptor.getName(), descriptor, true, true);
+                  } catch (Throwable t) {
+                    SolrException.log(log, "Error registering SolrCore", t);
+                  }
                 }
               }
   
@@ -200,6 +209,45 @@ public final class ZkController {
 
  
         });
+    
+    zkClient.getZkClientConnectionStrategy().addDisconnectedListener(new ZkClientConnectionStrategy.DisconnectedListener() {
+      
+      @Override
+      public void disconnected() {
+        List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+        // re register all descriptors
+        if (descriptors  != null) {
+          for (CoreDescriptor descriptor : descriptors) {
+            descriptor.getCloudDescriptor().isLeader = false;
+          }
+        }
+      }
+    });
+    
+    zkClient.getZkClientConnectionStrategy().addConnectedListener(new ZkClientConnectionStrategy.ConnectedListener() {
+      
+      @Override
+      public void connected() {
+        List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+        if (descriptors  != null) {
+          for (CoreDescriptor descriptor : descriptors) {
+            CloudDescriptor cloudDesc = descriptor.getCloudDescriptor();
+            String leaderUrl;
+            try {
+              leaderUrl = getLeaderProps(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+                  .getCoreUrl();
+            } catch (InterruptedException e) {
+              throw new RuntimeException();
+            }
+            String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), descriptor.getName());
+            boolean isLeader = leaderUrl.equals(ourUrl);
+            log.info("SolrCore connected to ZooKeeper - we are " + ourUrl + " and leader is " + leaderUrl);
+            cloudDesc.isLeader = isLeader;
+          }
+        }
+      }
+    });
+    
     this.overseerJobQueue = Overseer.getInQueue(zkClient);
     this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
     cmdExecutor = new ZkCmdExecutor();
@@ -468,7 +516,7 @@ public final class ZkController {
    * @throws Exception
    */
   public String register(String coreName, final CoreDescriptor desc) throws Exception {  
-    return register(coreName, desc, false);
+    return register(coreName, desc, false, false);
   }
   
 
@@ -478,10 +526,11 @@ public final class ZkController {
    * @param coreName
    * @param desc
    * @param recoverReloadedCores
+   * @param afterExpiration
    * @return the shardId for the SolrCore
    * @throws Exception
    */
-  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {  
+  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {  
     final String baseUrl = getBaseUrl();
     
     final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
@@ -506,7 +555,7 @@ public final class ZkController {
     ZkNodeProps leaderProps = new ZkNodeProps(props);
     
     try {
-      joinElection(desc);
+      joinElection(desc, afterExpiration);
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
@@ -517,25 +566,7 @@ public final class ZkController {
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
     }
     
-    // rather than look in the cluster state file, we go straight to the zknodes
-    // here, because on cluster restart there could be stale leader info in the
-    // cluster state node that won't be updated for a moment
-    String leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
-    
-    // now wait until our currently cloud state contains the latest leader
-    String clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
-    int tries = 0;
-    while (!leaderUrl.equals(clusterStateLeader)) {
-      if (tries == 60) {
-        throw new SolrException(ErrorCode.SERVER_ERROR,
-            "There is conflicting information about the leader of shard: "
-                + cloudDesc.getShardId() + " our state says:" + clusterStateLeader + " but zookeeper says:" + leaderUrl);
-      }
-      Thread.sleep(1000);
-      tries++;
-      clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId, 30000);
-      leaderUrl = getLeaderProps(collection, cloudDesc.getShardId()).getCoreUrl();
-    }
+    String leaderUrl = getLeader(cloudDesc);
     
     String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     log.info("We are " + ourUrl + " and leader is " + leaderUrl);
@@ -568,8 +599,7 @@ public final class ZkController {
         } else {
           log.info("No LogReplay needed for core="+core.getName() + " baseURL=" + baseUrl);
         }
-      }
-      
+      }      
       boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
           collection, coreZkNodeName, shardId, leaderProps, core, cc);
       if (!didRecovery) {
@@ -580,12 +610,52 @@ public final class ZkController {
         core.close();
       }
     }
+
     
     // make sure we have an update cluster state right away
     zkStateReader.updateClusterState(true);
 
     return shardId;
   }
+
+  private String getLeader(final CloudDescriptor cloudDesc) {
+    
+    String collection = cloudDesc.getCollectionName();
+    String shardId = cloudDesc.getShardId();
+    // rather than look in the cluster state file, we go straight to the zknodes
+    // here, because on cluster restart there could be stale leader info in the
+    // cluster state node that won't be updated for a moment
+    String leaderUrl;
+    try {
+      leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+          .getCoreUrl();
+      
+      // now wait until our currently cloud state contains the latest leader
+      String clusterStateLeader = zkStateReader.getLeaderUrl(collection,
+          shardId, 30000);
+      int tries = 0;
+      while (!leaderUrl.equals(clusterStateLeader)) {
+        if (tries == 60) {
+          throw new SolrException(ErrorCode.SERVER_ERROR,
+              "There is conflicting information about the leader of shard: "
+                  + cloudDesc.getShardId() + " our state says:"
+                  + clusterStateLeader + " but zookeeper says:" + leaderUrl);
+        }
+        Thread.sleep(1000);
+        tries++;
+        clusterStateLeader = zkStateReader.getLeaderUrl(collection, shardId,
+            30000);
+        leaderUrl = getLeaderProps(collection, cloudDesc.getShardId())
+            .getCoreUrl();
+      }
+      
+    } catch (Exception e) {
+      log.error("Error getting leader from zk", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error getting leader from zk", e);
+    } 
+    return leaderUrl;
+  }
   
   /**
    * Get leader props directly from zk nodes.
@@ -597,8 +667,9 @@ public final class ZkController {
    * @throws InterruptedException
    */
   private ZkCoreNodeProps getLeaderProps(final String collection,
-      final String slice) throws KeeperException, InterruptedException {
+      final String slice) throws InterruptedException {
     int iterCount = 60;
+    Exception exp = null;
     while (iterCount-- > 0) {
       try {
         byte[] data = zkClient.getData(
@@ -607,15 +678,21 @@ public final class ZkController {
         ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(
             ZkNodeProps.load(data));
         return leaderProps;
-      } catch (NoNodeException e) {
+      } catch (InterruptedException e) {
+        throw e;
+      } catch (Exception e) {
+        exp = e;
         Thread.sleep(500);
       }
+      if (cc.isShutDown()) {
+        throw new RuntimeException("CoreContainer is shutdown");
+      }
     }
-    throw new RuntimeException("Could not get leader props");
+    throw new RuntimeException("Could not get leader props", exp);
   }
 
 
-  private void joinElection(CoreDescriptor cd) throws InterruptedException, KeeperException, IOException {
+  private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException {
     
     String shardId = cd.getCloudDescriptor().getShardId();
     
@@ -631,7 +708,7 @@ public final class ZkController {
         .getCollectionName();
     
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
-        collection, coreZkNodeName, ourProps, this, cc);
+        collection, coreZkNodeName, ourProps, this, cc, afterExpiration);
 
     leaderElector.setup(context);
     electionContexts.put(coreZkNodeName, context);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Thu Aug 30 16:35:24 2012
@@ -598,15 +598,16 @@ public class CoreContainer 
         }
         cores.clear();
       } finally {
+        if (shardHandlerFactory != null) {
+          shardHandlerFactory.close();
+        }
+        // we want to close zk stuff last
         if(zkController != null) {
           zkController.close();
         }
         if (zkServer != null) {
           zkServer.stop();
         }
-        if (shardHandlerFactory != null) {
-          shardHandlerFactory.close();
-        }
       }
     }
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu Aug 30 16:35:24 2012
@@ -721,6 +721,21 @@ public class CoreAdminHandler extends Re
         props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName());
         
         boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props));
+        // solrcloud_debug
+//         try {
+//         RefCounted<SolrIndexSearcher> searchHolder =
+//         core.getNewestSearcher(false);
+//         SolrIndexSearcher searcher = searchHolder.get();
+//         try {
+//         System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName()
+//         + " synched "
+//         + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+//         } finally {
+//         searchHolder.decref();
+//         }
+//         } catch (Exception e) {
+//        
+//         }
         if (!success) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Sync Failed");
         }
@@ -750,8 +765,11 @@ public class CoreAdminHandler extends Re
     String coreNodeName = params.get("coreNodeName");
     String waitForState = params.get("state");
     Boolean checkLive = params.getBool("checkLive");
+    Boolean onlyIfLeader = params.getBool("onlyIfLeader");
     int pauseFor = params.getInt("pauseFor", 0);
     
+
+    
     String state = null;
     boolean live = false;
     int retry = 0;
@@ -764,6 +782,12 @@ public class CoreAdminHandler extends Re
               + cname);
         }
         if (core != null) {
+          if (onlyIfLeader != null && onlyIfLeader) {
+           if (!core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
+             throw new SolrException(ErrorCode.BAD_REQUEST, "We are not the leader");
+           }
+          }
+          
           // wait until we are sure the recovering node is ready
           // to accept updates
           CloudDescriptor cloudDescriptor = core.getCoreDescriptor()

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu Aug 30 16:35:24 2012
@@ -166,6 +166,8 @@ public class SolrCmdDistributor {
     
     addCommit(ureq, cmd);
     
+    log.info("Distrib commit to:" + nodes);
+    
     for (Node node : nodes) {
       submit(ureq, node);
     }
@@ -345,7 +347,8 @@ public class SolrCmdDistributor {
     try {
       semaphore.acquire();
     } catch (InterruptedException e) {
-      throw new RuntimeException();
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Update thread interrupted");
     }
     pending.add(completionService.submit(task));
     

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug 30 16:35:24 2012
@@ -183,15 +183,9 @@ public class DistributedUpdateProcessor 
       // set num nodes
       numNodes = zkController.getClusterState().getLiveNodes().size();
       
-      // the leader is...
-      // TODO: if there is no leader, wait and look again
-      // TODO: we are reading the leader from zk every time - we should cache
-      // this and watch for changes?? Just pull it from ZkController cluster state probably?
       String shardId = getShard(hash, collection, zkController.getClusterState()); // get the right shard based on the hash...
 
       try {
-        // TODO: if we find out we cannot talk to zk anymore, we should probably realize we are not
-        // a leader anymore - we shouldn't accept updates at all??
         ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(zkController.getZkStateReader().getLeaderProps(
             collection, shardId));
         
@@ -201,7 +195,10 @@ public class DistributedUpdateProcessor 
         isLeader = coreNodeName.equals(leaderNodeName);
         
         DistribPhase phase = 
-          DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+            DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+       
+        doDefensiveChecks(shardId, phase);
+     
 
         if (DistribPhase.FROMLEADER == phase) {
           // we are coming from the leader, just go local - add no urls
@@ -251,6 +248,36 @@ public class DistributedUpdateProcessor 
     return nodes;
   }
 
+  private void doDefensiveChecks(String shardId, DistribPhase phase) {
+    String from = req.getParams().get("distrib.from");
+    boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
+    if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+      log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Request says it is coming from leader, but we are the leader");
+    }
+
+    if (DistribPhase.FROMLEADER == phase && from != null) { // from will be null on log replay
+     
+      ZkCoreNodeProps clusterStateLeader = new ZkCoreNodeProps(zkController
+          .getClusterState().getLeader(collection, shardId));
+    
+      if (clusterStateLeader.getNodeProps() == null
+          || !clusterStateLeader.getCoreUrl().equals(from)) {
+        String coreUrl = null;
+        if (clusterStateLeader.getNodeProps() != null) {
+          coreUrl = clusterStateLeader.getCoreUrl();
+        }
+        log.error("We got a request from the leader, but it's not who our cluster state says is the leader :"
+            + req.getParamString()
+            + " : "
+            + coreUrl);
+
+        new SolrException(ErrorCode.BAD_REQUEST, "We got a request from the leader, but it's not who our cluster state says is the leader.");
+      }
+ 
+    }
+  }
+
 
   private String getShard(int hash, String collection, ClusterState clusterState) {
     // ranges should be part of the cloud state and eventually gotten from zk
@@ -329,6 +356,8 @@ public class DistributedUpdateProcessor 
                   DistribPhase.FROMLEADER.toString() : 
                   DistribPhase.TOLEADER.toString()));
       params.remove("commit"); // this will be distributed from the local commit
+      params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
+          zkController.getBaseUrl(), req.getCore().getName()));
       cmdDistrib.distribAdd(cmd, nodes, params);
     }
     
@@ -378,9 +407,11 @@ public class DistributedUpdateProcessor 
 
     // TODO: we should do this in the background it would seem
     for (SolrCmdDistributor.Error error : response.errors) {
-      if (error.node instanceof RetryNode) {
+      if (error.node instanceof RetryNode || error.e instanceof SolrException) {
         // we don't try to force a leader to recover
         // when we cannot forward to it
+        // and we assume SolrException means
+        // the node went down
         continue;
       }
       // TODO: we should force their state to recovering ??
@@ -658,6 +689,10 @@ public class DistributedUpdateProcessor 
                  (isLeader ? 
                   DistribPhase.FROMLEADER.toString() : 
                   DistribPhase.TOLEADER.toString()));
+      if (isLeader) {
+        params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
+            zkController.getBaseUrl(), req.getCore().getName()));
+      }
       params.remove("commit"); // we already will have forwarded this from our local commit
       cmdDistrib.distribDelete(cmd, nodes, params);
     }
@@ -819,6 +854,8 @@ public class DistributedUpdateProcessor 
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
       params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
       params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
+      params.set("update.from", ZkCoreNodeProps.getCoreUrl(
+          zkController.getBaseUrl(), req.getCore().getName()));
       cmdDistrib.distribDelete(cmd, replicas, params);
       cmdDistrib.finish();
     }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Thu Aug 30 16:35:24 2012
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
 public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
   public static Logger log = LoggerFactory.getLogger(ChaosMonkeyNothingIsSafeTest.class);
   
-  private static final int BASE_RUN_LENGTH = 45000;
+  private static final int BASE_RUN_LENGTH = 20000;
 
   @BeforeClass
   public static void beforeSuperClass() {
@@ -56,8 +56,8 @@ public class ChaosMonkeyNothingIsSafeTes
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    // TODO use @Noisy annotation as we expect lots of exceptions
-    //ignoreException(".*");
+    // can help to hide this when testing and looking at logs
+    //ignoreException("shard update error");
     System.setProperty("numShards", Integer.toString(sliceCount));
   }
   
@@ -71,8 +71,8 @@ public class ChaosMonkeyNothingIsSafeTes
   
   public ChaosMonkeyNothingIsSafeTest() {
     super();
-    sliceCount = 3;
-    shardCount = 12;
+    sliceCount = 1;
+    shardCount = 7;
   }
   
   @Override
@@ -83,9 +83,16 @@ public class ChaosMonkeyNothingIsSafeTes
       handle.put("QTime", SKIPVAL);
       handle.put("timestamp", SKIPVAL);
       
+      // make sure we have leaders for each shard
+      for (int j = 1; j < sliceCount; j++) {
+        zkStateReader.getLeaderProps(DEFAULT_COLLECTION, "shard" + j, 10000);
+      }      // make sure we again have leaders for each shard
+      
+      waitForRecoveriesToFinish(false);
+      
       // we cannot do delete by query
       // as it's not supported for recovery
-      // del("*:*");
+       del("*:*");
       
       List<StopableThread> threads = new ArrayList<StopableThread>();
       int threadCount = 1;
@@ -152,6 +159,7 @@ public class ChaosMonkeyNothingIsSafeTes
       zkStateReader.updateClusterState(true);
       assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
       
+      
       // we dont't current check vs control because the full throttle thread can
       // have request fails
       checkShardConsistency(false, true);

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Thu Aug 30 16:35:24 2012
@@ -50,11 +50,6 @@ public class ChaosMonkeySafeLeaderTest e
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    // we expect this time of exception as shards go up and down...
-    //ignoreException(".*");
-    
-    // sometimes we cannot get the same port
-    ignoreException("java\\.net\\.BindException: Address already in use");
     
     System.setProperty("numShards", Integer.toString(sliceCount));
   }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Thu Aug 30 16:35:24 2012
@@ -121,7 +121,9 @@ public class CoreAdminRequest extends So
     protected String state;
     protected Boolean checkLive;
     protected Integer pauseFor;
+    protected Boolean onlyIfLeader;
     
+
     public WaitForState() {
       action = CoreAdminAction.PREPRECOVERY;
     }
@@ -166,6 +168,14 @@ public class CoreAdminRequest extends So
       this.pauseFor = pauseFor;
     }
     
+    public boolean isOnlyIfLeader() {
+      return onlyIfLeader;
+    }
+
+    public void setOnlyIfLeader(boolean onlyIfLeader) {
+      this.onlyIfLeader = onlyIfLeader;
+    }
+    
     @Override
     public SolrParams getParams() {
       if( action == null ) {
@@ -195,6 +205,10 @@ public class CoreAdminRequest extends So
       if (pauseFor != null) {
         params.set( "pauseFor", pauseFor);
       }
+      
+      if (onlyIfLeader != null) {
+        params.set( "onlyIfLeader", onlyIfLeader);
+      }
 
       return params;
     }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Thu Aug 30 16:35:24 2012
@@ -38,6 +38,8 @@ class ConnectionManager implements Watch
   private boolean connected;
 
   private final ZkClientConnectionStrategy connectionStrategy;
+  
+  private Object connectionUpdateLock = new Object();
 
   private String zkServerAddress;
 
@@ -72,6 +74,7 @@ class ConnectionManager implements Watch
     }
     
     if (isClosed) {
+      log.info("Client->ZooKeeper status change trigger but we are already closed");
       return;
     }
 
@@ -79,28 +82,25 @@ class ConnectionManager implements Watch
     if (state == KeeperState.SyncConnected) {
       connected = true;
       clientConnected.countDown();
+      connectionStrategy.connected();
     } else if (state == KeeperState.Expired) {
       connected = false;
-      log.info("Attempting to reconnect to recover relationship with ZooKeeper...");
-
+      log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
+      
       try {
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
             new ZkClientConnectionStrategy.ZkUpdate() {
               @Override
               public void update(SolrZooKeeper keeper) {
                 // if keeper does not replace oldKeeper we must be sure to close it
-                synchronized (connectionStrategy) {
+                synchronized (connectionUpdateLock) {
                   try {
                     waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
-                  } catch (InterruptedException e1) {
-                    closeKeeper(keeper);
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Giving up on connecting - we were interrupted", e1);
                   } catch (Exception e1) {
                     closeKeeper(keeper);
                     throw new RuntimeException(e1);
                   }
-                  
+                  log.info("Connection with ZooKeeper reestablished.");
                   try {
                     client.updateKeeper(keeper);
                   } catch (InterruptedException e) {
@@ -129,7 +129,9 @@ class ConnectionManager implements Watch
       }
       log.info("Connected:" + connected);
     } else if (state == KeeperState.Disconnected) {
+      log.info("zkClient has disconnected");
       connected = false;
+      connectionStrategy.disconnected();
     } else {
       connected = false;
     }
@@ -151,19 +153,26 @@ class ConnectionManager implements Watch
   }
 
   public synchronized void waitForConnected(long waitForConnection)
-      throws InterruptedException, TimeoutException {
+      throws TimeoutException {
+    log.info("Waiting for client to connect to ZooKeeper");
     long expire = System.currentTimeMillis() + waitForConnection;
     long left = 1;
     while (!connected && left > 0) {
       if (isClosed) {
         break;
       }
-      wait(500);
+      try {
+        wait(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
       left = expire - System.currentTimeMillis();
     }
     if (!connected) {
       throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
     }
+    log.info("Client is connected to ZooKeeper");
   }
 
   public synchronized void waitForDisconnected(long timeout)

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Thu Aug 30 16:35:24 2012
@@ -74,6 +74,7 @@ public class SolrZkClient {
   private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
 
   private volatile boolean isClosed = false;
+  private ZkClientConnectionStrategy zkClientConnectionStrategy;
   
   /**
    * @param zkServerAddress
@@ -116,6 +117,7 @@ public class SolrZkClient {
    */
   public SolrZkClient(String zkServerAddress, int zkClientTimeout,
       ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
+    this.zkClientConnectionStrategy = strat;
     connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
         + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
     try {
@@ -135,29 +137,24 @@ public class SolrZkClient {
               }
             }
           });
-    } catch (IOException e) {
-      connManager.close();
-      throw new RuntimeException();
-    } catch (InterruptedException e) {
-      connManager.close();
-      throw new RuntimeException();
-    } catch (TimeoutException e) {
+    } catch (Throwable e) {
       connManager.close();
       throw new RuntimeException();
     }
+    
     try {
       connManager.waitForConnected(clientConnectTimeout);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      connManager.close();
-      throw new RuntimeException();
-    } catch (TimeoutException e) {
+    } catch (Throwable e) {
       connManager.close();
       throw new RuntimeException();
     }
     numOpens.incrementAndGet();
   }
 
+  public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
+    return zkClientConnectionStrategy;
+  }
+
   /**
    * @return true if client is connected
    */

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java Thu Aug 30 16:35:24 2012
@@ -18,18 +18,65 @@ package org.apache.solr.common.cloud;
  */
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.SolrZooKeeper;
 import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public abstract class ZkClientConnectionStrategy {
+  private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class);
+  
+  private List<DisconnectedListener> disconnectedListeners = new ArrayList<DisconnectedListener>();
+  private List<ConnectedListener> connectedListeners = new ArrayList<ConnectedListener>();
+  
   public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
   public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
   
+  public synchronized void disconnected() {
+    for (DisconnectedListener listener : disconnectedListeners) {
+      try {
+        listener.disconnected();
+      } catch (Throwable t) {
+        SolrException.log(log, "", t);
+      }
+    }
+  }
+  
+  public synchronized void connected() {
+    for (ConnectedListener listener : connectedListeners) {
+      try {
+        listener.connected();
+      } catch (Throwable t) {
+        SolrException.log(log, "", t);
+      }
+    }
+  }
+  
+  public interface DisconnectedListener {
+    public void disconnected();
+  };
+  
+  public interface ConnectedListener {
+    public void connected();
+  };
+  
+  
+  public synchronized void addDisconnectedListener(DisconnectedListener listener) {
+    disconnectedListeners.add(listener);
+  }
+  
+  public synchronized void addConnectedListener(ConnectedListener listener) {
+    connectedListeners.add(listener);
+  }
+  
   public static abstract class ZkUpdate {
     public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
   }

Modified: lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Aug 30 16:35:24 2012
@@ -801,7 +801,7 @@ public abstract class AbstractFullDistri
             SolrDocumentList lst1 = lastJetty.client.solrClient.query(query).getResults();
             SolrDocumentList lst2 = cjetty.client.solrClient.query(query).getResults();
 
-            showDiff(lst1, lst2, lastJetty.toString(), cjetty.client.solrClient.toString());
+            showDiff(lst1, lst2, lastJetty.url, cjetty.url);
           }
 
         }
@@ -1130,7 +1130,8 @@ public abstract class AbstractFullDistri
       
       try {
         commit();
-      } catch (Exception e) {
+      } catch (Throwable t) {
+        t.printStackTrace();
         // we don't care if this commit fails on some nodes
       }
       
@@ -1146,8 +1147,8 @@ public abstract class AbstractFullDistri
         retry  = true;
       }
       cnt++;
-      if (cnt > 2) break;
-      Thread.sleep(4000);
+      if (cnt > 4) break;
+      Thread.sleep(2000);
     } while (retry);
   }
   

Modified: lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java?rev=1379008&r1=1379007&r2=1379008&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java Thu Aug 30 16:35:24 2012
@@ -17,7 +17,6 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import java.net.BindException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -174,16 +173,10 @@ public class ChaosMonkey {
   public static void kill(CloudJettyRunner cjetty) throws Exception {
     JettySolrRunner jetty = cjetty.jetty;
     monkeyLog("kill shard! " + jetty.getLocalPort());
-    FilterHolder fh = jetty.getDispatchFilter();
-    SolrDispatchFilter sdf = null;
-    if (fh != null) {
-      sdf = (SolrDispatchFilter) fh.getFilter();
-    }
+    
     jetty.stop();
     
-    if (sdf != null) {
-      sdf.destroy();
-    }
+    stop(jetty);
     
     if (!jetty.isStopped()) {
       throw new RuntimeException("could not kill jetty");
@@ -441,6 +434,7 @@ public class ChaosMonkey {
   }
   
   public static boolean start(JettySolrRunner jetty) throws Exception {
+    
     try {
       jetty.start();
     } catch (Exception e) {
@@ -454,7 +448,7 @@ public class ChaosMonkey {
         try {
           jetty.start();
         } catch (Exception e3) {
-          log.error("", e3);
+          log.error("Could not get the port to start jetty again", e3);
           // we coud not get the port
           jetty.stop();
           return false;