You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/09/03 21:46:48 UTC

svn commit: r1380322 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/component/ solr/core/src/java/org/apache/solr/up...

Author: markrmiller
Date: Mon Sep  3 19:46:47 2012
New Revision: 1380322

URL: http://svn.apache.org/viewvc?rev=1380322&view=rev
Log:
SOLR-3772,SOLR-3750: make optional, wait time configurable, default to off
SOLR-3782: A leader going down while updates are coming in can cause shard inconsistency. 

Added:
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
      - copied unchanged from r1380300, lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Sep  3 19:46:47 2012
@@ -99,17 +99,12 @@ Bug Fixes
 
 * SOLR-3770: Overseer may lose updates to cluster state (siren)
 
-* SOLR-3721: Fix bug that could allow multiple recoveries to run briefly at
-  the same time if the recovery thread join call was interrupted.
+* SOLR-3721: Fix bug that could theoretically allow multiple recoveries to run
+  briefly at the same time if the recovery thread join call was interrupted.
   (Per Steffensen, Mark Miller)
-  
-* SOLR-3750: On session expiration, we should explicitly wait some time before 
-  running the leader sync process so that we are sure every node participates.
-  (Per Steffensen, Mark Miller)
-  
-* SOLR-3772: On cluster startup, we should wait until we see all registered 
-  replicas before running the leader process - or if they all do not come up, 
-  N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
+
+* SOLR-3782: A leader going down while updates are coming in can cause shard
+  inconsistency. (Mark Miller)
 
 Other Changes
 ----------------------
@@ -136,8 +131,15 @@ Other Changes
 * SOLR-3780: Maven build: Make solrj tests run separately from solr-core.
   (Steve Rowe)
 
-* SOLR-3707: Upgrade Solr to Tika 1.2 (janhoy)
+* SOLR-3707: Upgrade Solr to Tika 1.2 (janhoy) 
+  
+* SOLR-3772: Optionally, on cluster startup, we can wait until we see all registered 
+  replicas before running the leader process - or if they all do not come up, 
+  N amount of time. (Jan Høydahl, Per Steffensen, Mark Miller)
 
+* SOLR-3750: Optionaly, on session expiration, we can explicitly wait some time before 
+  running the leader sync process so that we are sure every node participates.
+  (Per Steffensen, Mark Miller)
 
 ==================  4.0.0-BETA ===================
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Sep  3 19:46:47 2012
@@ -55,6 +55,8 @@ public abstract class ElectionContext {
     this.zkClient = zkClient;
   }
   
+  public void close() {}
+  
   public void cancelElection() throws InterruptedException, KeeperException {
     zkClient.delete(leaderSeqPath, -1, true);
   }
@@ -83,10 +85,6 @@ class ShardLeaderElectionContextBase ext
   @Override
   void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
       InterruptedException, IOException {
-    // this pause is important
-    // but I don't know why yet :*( - it must come before this publish call
-    // and can happen at the start of leader election process even
-    Thread.sleep(100);
     
     zkClient.makePath(leaderPath, ZkStateReader.toJSON(leaderProps),
         CreateMode.EPHEMERAL, true);
@@ -112,6 +110,8 @@ final class ShardLeaderElectionContext e
   private SyncStrategy syncStrategy = new SyncStrategy();
 
   private boolean afterExpiration;
+
+  private volatile boolean isClosed = false;
   
   public ShardLeaderElectionContext(LeaderElector leaderElector, 
       final String shardId, final String collection,
@@ -124,8 +124,15 @@ final class ShardLeaderElectionContext e
   }
   
   @Override
+  public void close() {
+    this.isClosed  = true;
+  }
+  
+  @Override
   void runLeaderProcess(boolean weAreReplacement) throws KeeperException,
       InterruptedException, IOException {
+    log.info("Running the leader process. afterExpiration=" + afterExpiration);
+    
     String coreName = leaderProps.get(ZkStateReader.CORE_NAME_PROP);
     
     // clear the leader in clusterstate
@@ -134,21 +141,10 @@ final class ShardLeaderElectionContext e
         collection);
     Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
     
-    waitForReplicasToComeUp(weAreReplacement);
-    
-    // wait for local leader state to clear...
-    // int tries = 0;
-    // while (zkController.getClusterState().getLeader(collection, shardId) !=
-    // null) {
-    // System.out.println("leader still shown " + tries + " " +
-    // zkController.getClusterState().getLeader(collection, shardId));
-    // Thread.sleep(1000);
-    // tries++;
-    // if (tries == 30) {
-    // break;
-    // }
-    // }
-    // Thread.sleep(1000);
+    String leaderVoteWait = cc.getZkController().getLeaderVoteWait();
+    if (leaderVoteWait != null) {
+      waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
+    }
     
     SolrCore core = null;
     try {
@@ -238,14 +234,14 @@ final class ShardLeaderElectionContext e
     
   }
 
-  private void waitForReplicasToComeUp(boolean weAreReplacement)
+  private void waitForReplicasToComeUp(boolean weAreReplacement, String leaderVoteWait)
       throws InterruptedException {
-    int retries = 300; // ~ 5 min
+    int timeout = Integer.parseInt(leaderVoteWait);
+    long timeoutAt = System.currentTimeMillis() + timeout;
+
     boolean tryAgain = true;
     Slice slices = zkController.getClusterState().getSlice(collection, shardId);
-    log.info("Running the leader process. afterExperiation=" + afterExpiration);
-    while (tryAgain || slices == null) {
-      
+    while (true && !isClosed) {
       // wait for everyone to be up
       if (slices != null) {
         Map<String,ZkNodeProps> shards = slices.getShards();
@@ -265,24 +261,23 @@ final class ShardLeaderElectionContext e
         if ((afterExpiration || !weAreReplacement)
             && found >= slices.getShards().size()) {
           log.info("Enough replicas found to continue.");
-          tryAgain = false;
+          break;
         } else if (!afterExpiration && found >= slices.getShards().size() - 1) {
           // a previous leader went down - wait for one less than the total
           // known shards
           log.info("Enough replicas found to continue.");
-          tryAgain = false;
+          break;
         } else {
-          log.info("Waiting until we see more replicas up");
+          log.info("Waiting until we see more replicas up: total=" + slices.getShards().size() + " found=" + found + " timeoutin=" + (timeoutAt - System.currentTimeMillis()));
         }
-        
-        retries--;
-        if (retries == 0) {
+  
+        if (System.currentTimeMillis() > timeoutAt) {
           log.info("Was waiting for replicas to come up, but they are taking too long - assuming they won't come back till later");
           break;
         }
       }
       if (tryAgain) {
-        Thread.sleep(1000);
+        Thread.sleep(500);
         slices = zkController.getClusterState().getSlice(collection, shardId);
       }
     }
@@ -306,6 +301,12 @@ final class ShardLeaderElectionContext e
   
   private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core) {
     log.info("Checking if I should try and be the leader.");
+    
+    if (isClosed) {
+      log.info("Bailing on leader process because we have been closed");
+      return false;
+    }
+    
     ClusterState clusterState = zkController.getZkStateReader().getClusterState();
     Map<String,Slice> slices = clusterState.getSlices(this.collection);
     Slice slice = slices.get(shardId);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Sep  3 19:46:47 2012
@@ -65,8 +65,14 @@ public class RecoveryStrategy extends Th
 
   private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
 
+  public static interface RecoveryListener {
+    public void recovered();
+    public void failed();
+  }
+  
   private volatile boolean close = false;
 
+  private RecoveryListener recoveryListener;
   private ZkController zkController;
   private String baseUrl;
   private String coreZkNodeName;
@@ -76,9 +82,10 @@ public class RecoveryStrategy extends Th
   private boolean recoveringAfterStartup;
   private CoreContainer cc;
   
-  public RecoveryStrategy(CoreContainer cc, String name) {
+  public RecoveryStrategy(CoreContainer cc, String name, RecoveryListener recoveryListener) {
     this.cc = cc;
     this.coreName = name;
+    this.recoveryListener = recoveryListener;
     setName("RecoveryThread");
     zkController = cc.getZkController();
     zkStateReader = zkController.getZkStateReader();
@@ -93,7 +100,7 @@ public class RecoveryStrategy extends Th
   // make sure any threads stop retrying
   public void close() {
     close = true;
-    log.warn("Stopping recovery for core " + coreName + " zkNodeName=" + coreZkNodeName);
+    log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName );
   }
 
   
@@ -105,6 +112,7 @@ public class RecoveryStrategy extends Th
       zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
     } finally {
       close();
+      recoveryListener.failed();
     }
   }
   
@@ -210,15 +218,15 @@ public class RecoveryStrategy extends Th
 
       try {
         doRecovery(core);
-      } catch (KeeperException e) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      } catch (InterruptedException e) {
+      }  catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         SolrException.log(log, "", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);
+      } catch (Throwable t) {
+        log.error("", t);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", t);
       }
     } finally {
       if (core != null) core.close();
@@ -258,38 +266,52 @@ public class RecoveryStrategy extends Th
 
     List<Long> startingVersions = ulog.getStartingVersions();
 
-
     if (startingVersions != null && recoveringAfterStartup) {
-      int oldIdx = 0;  // index of the start of the old list in the current list
-      long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-
-      for (; oldIdx<recentVersions.size(); oldIdx++) {
-        if (recentVersions.get(oldIdx) == firstStartingVersion) break;
-      }
-
-      if (oldIdx > 0) {
-        log.info("####### Found new versions added after startup: num=" + oldIdx);
-        log.info("###### currentVersions=" + recentVersions);
+      try {
+        int oldIdx = 0; // index of the start of the old list in the current
+                        // list
+        long firstStartingVersion = startingVersions.size() > 0 ? startingVersions
+            .get(0) : 0;
+        
+        for (; oldIdx < recentVersions.size(); oldIdx++) {
+          if (recentVersions.get(oldIdx) == firstStartingVersion) break;
+        }
+        
+        if (oldIdx > 0) {
+          log.info("####### Found new versions added after startup: num="
+              + oldIdx);
+          log.info("###### currentVersions=" + recentVersions);
+        }
+        
+        log.info("###### startupVersions=" + startingVersions);
+      } catch (Throwable t) {
+        SolrException.log(log, "Error getting recent versions. core=" + coreName, t);
+        recentVersions = new ArrayList<Long>(0);
       }
-
-      log.info("###### startupVersions=" + startingVersions);
     }
 
     if (recoveringAfterStartup) {
       // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
       // when we went down.  We may have received updates since then.
       recentVersions = startingVersions;
-
-      if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
-        // last operation at the time of startup had the GAP flag set...
-        // this means we were previously doing a full index replication
-        // that probably didn't complete and buffering updates in the meantime.
-        log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core=" + coreName);
-        firstTime = false;    // skip peersync
+      try {
+        if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
+          // last operation at the time of startup had the GAP flag set...
+          // this means we were previously doing a full index replication
+          // that probably didn't complete and buffering updates in the
+          // meantime.
+          log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core="
+              + coreName);
+          firstTime = false; // skip peersync
+        }
+      } catch (Throwable t) {
+        SolrException.log(log, "Error trying to get ulog starting operation. core="
+            + coreName, t);
+        firstTime = false; // skip peersync
       }
     }
 
-    while (!successfulRecovery && !isClosed() && !isInterrupted()) { // don't use interruption or it will close channels though
+    while (!successfulRecovery && !isInterrupted()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor()
             .getCloudDescriptor();
@@ -393,6 +415,7 @@ public class RecoveryStrategy extends Th
           zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
           close = true;
           successfulRecovery = true;
+          recoveryListener.recovered();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           log.warn("Recovery was interrupted", e);
@@ -421,10 +444,17 @@ public class RecoveryStrategy extends Th
         try {
 
           log.error("Recovery failed - trying again... core=" + coreName);
+          
+          if (isClosed()) {
+            retries = INTERRUPTED;
+          }
+          
           retries++;
           if (retries >= MAX_RETRIES) {
             if (retries == INTERRUPTED) {
-
+              SolrException.log(log, "Recovery failed - interrupted. core=" + coreName);
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+                  core.getCoreDescriptor());
             } else {
               SolrException.log(log, "Recovery failed - max retries exceeded. core=" + coreName);
               recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
@@ -433,7 +463,7 @@ public class RecoveryStrategy extends Th
             break;
           }
 
-        } catch (Exception e) {
+        } catch (Throwable e) {
           SolrException.log(log, "core=" + coreName, e);
         }
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Sep  3 19:46:47 2012
@@ -123,6 +123,8 @@ public final class ZkController {
 
   protected volatile Overseer overseer;
 
+  private String leaderVoteWait;
+
   /**
    * @param cc
    * @param zkServerAddress
@@ -139,6 +141,27 @@ public final class ZkController {
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
       String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
+    this(cc, zkServerAddress, zkClientTimeout, zkClientConnectTimeout, localHost, locaHostPort, localHostContext, null, registerOnReconnect);
+  }
+  
+
+  /**
+   * @param cc
+   * @param zkServerAddress
+   * @param zkClientTimeout
+   * @param zkClientConnectTimeout
+   * @param localHost
+   * @param locaHostPort
+   * @param localHostContext
+   * @param leaderVoteWait
+   * @param registerOnReconnect
+   * @throws InterruptedException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+      String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
+      TimeoutException, IOException {
     if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
     this.cc = cc;
     if (localHostContext.contains("/")) {
@@ -153,6 +176,7 @@ public final class ZkController {
     this.hostName = getHostNameFromAddress(this.localHost);
     this.nodeName = this.hostName + ':' + this.localHostPort + '_' + this.localHostContext;
     this.baseURL = this.localHost + ":" + this.localHostPort + "/" + this.localHostContext;
+    this.leaderVoteWait = leaderVoteWait;
 
     zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
         // on reconnect, reload cloud info
@@ -257,6 +281,10 @@ public final class ZkController {
     init(registerOnReconnect);
   }
 
+  public String getLeaderVoteWait() {
+    return leaderVoteWait;
+  }
+
   private void registerAllCoresAsDown(
       final CurrentCoreDescriptorProvider registerOnReconnect) {
     List<CoreDescriptor> descriptors = registerOnReconnect
@@ -282,6 +310,22 @@ public final class ZkController {
    */
   public void close() {
     try {
+      String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+      // we don't retry if there is a problem - count on ephem timeout
+      zkClient.delete(nodePath, -1, false);
+    } catch (KeeperException.NoNodeException e) {
+      // fine
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (KeeperException e) {
+      SolrException.log(log, "Error trying to remove our ephem live node", e);
+    }
+    
+    for (ElectionContext context : electionContexts.values()) {
+      context.close();
+    }
+    
+    try {
       overseer.close();
     } catch(Throwable t) {
       log.error("Error closing overseer", t);

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Mon Sep  3 19:46:47 2012
@@ -34,6 +34,9 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.xml.parsers.ParserConfigurationException;
@@ -56,10 +59,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.util.DOMUtil;
-import org.apache.solr.util.FileUtils;
-import org.apache.solr.util.SystemIdResolver;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.core.SolrXMLSerializer.SolrCoreXMLDef;
 import org.apache.solr.core.SolrXMLSerializer.SolrXMLDef;
 import org.apache.solr.handler.admin.CollectionsHandler;
@@ -71,6 +71,10 @@ import org.apache.solr.logging.LogWatche
 import org.apache.solr.logging.jul.JulWatcher;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.util.DOMUtil;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.FileUtils;
+import org.apache.solr.util.SystemIdResolver;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,8 +143,10 @@ public class CoreContainer 
   protected LogWatcher logging = null;
   private String zkHost;
   private Map<SolrCore,String> coreToOrigName = new ConcurrentHashMap<SolrCore,String>();
+  private String leaderVoteWait;
 
-
+  private ThreadPoolExecutor cmdDistribExecutor;
+  
   {
     log.info("New CoreContainer " + System.identityHashCode(this));
   }
@@ -184,6 +190,10 @@ public class CoreContainer 
   }
 
   protected void initZooKeeper(String zkHost, int zkClientTimeout) {
+    cmdDistribExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new DefaultSolrThreadFactory("cmdDistribExecutor"));
+    
     // if zkHost sys property is not set, we are not using ZooKeeper
     String zookeeperHost;
     if(zkHost == null) {
@@ -227,7 +237,7 @@ public class CoreContainer 
         } else {
           log.info("Zookeeper client=" + zookeeperHost);          
         }
-        zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, new CurrentCoreDescriptorProvider() {
+        zkController = new ZkController(this, zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext, leaderVoteWait, new CurrentCoreDescriptorProvider() {
           
           @Override
           public List<CoreDescriptor> getCurrentDescriptors() {
@@ -286,6 +296,11 @@ public class CoreContainer 
     
   }
 
+  // may return null if not in zk mode
+  public ThreadPoolExecutor getCmdDistribExecutor() {
+    return cmdDistribExecutor;
+  }
+
   public Properties getContainerProperties() {
     return containerProperties;
   }
@@ -456,6 +471,8 @@ public class CoreContainer 
 
     hostContext = cfg.get("solr/cores/@hostContext", DEFAULT_HOST_CONTEXT);
     host = cfg.get("solr/cores/@host", null);
+    
+    leaderVoteWait = cfg.get("solr/cores/@leaderVoteWait", null);
 
     if(shareSchema){
       indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
@@ -601,6 +618,13 @@ public class CoreContainer 
         if (shardHandlerFactory != null) {
           shardHandlerFactory.close();
         }
+        if (cmdDistribExecutor != null) {
+          try {
+            ExecutorUtil.shutdownAndAwaitTermination(cmdDistribExecutor);
+          } catch (Throwable e) {
+            SolrException.log(log, e);
+          }
+        }
         // we want to close zk stuff last
         if(zkController != null) {
           zkController.close();

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Mon Sep  3 19:46:47 2012
@@ -25,6 +25,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -163,7 +164,7 @@ public class HttpShardHandlerFactory ext
       SolrException.log(log, e);
     }
     try {
-      commExecutor.shutdownNow();
+      ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
     } catch (Throwable e) {
       SolrException.log(log, e);
     }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Mon Sep  3 19:46:47 2012
@@ -17,6 +17,13 @@ package org.apache.solr.handler.componen
  * limitations under the License.
  */
 
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.Term;
@@ -28,7 +35,7 @@ import org.apache.solr.common.SolrDocume
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
@@ -52,10 +59,6 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URL;
-import java.util.*;
-
 
 public class RealTimeGetComponent extends SearchComponent
 {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Mon Sep  3 19:46:47 2012
@@ -29,7 +29,7 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class DefaultSolrCoreState extends SolrCoreState {
+public final class DefaultSolrCoreState extends SolrCoreState implements RecoveryStrategy.RecoveryListener {
   public static Logger log = LoggerFactory.getLogger(DefaultSolrCoreState.class);
   
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
@@ -43,7 +43,7 @@ public final class DefaultSolrCoreState 
   private SolrIndexWriter indexWriter = null;
   private DirectoryFactory directoryFactory;
 
-  private boolean recoveryRunning;
+  private volatile boolean recoveryRunning;
   private RecoveryStrategy recoveryStrat;
   private boolean closed = false;
 
@@ -163,6 +163,7 @@ public final class DefaultSolrCoreState 
           log.error("Error during shutdown of directory factory.", t);
         }
         try {
+          log.info("Closing SolrCoreState - canceling any ongoing recovery");
           cancelRecovery();
         } catch (Throwable t) {
           log.error("Error cancelling recovery", t);
@@ -210,6 +211,7 @@ public final class DefaultSolrCoreState 
     }
     
     synchronized (recoveryLock) {
+      log.info("Running recovery - first canceling any ongoing recovery");
       cancelRecovery();
       
       while (recoveryRunning) {
@@ -229,7 +231,7 @@ public final class DefaultSolrCoreState 
       // if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
       boolean recoveringAfterStartup = recoveryStrat == null;
 
-      recoveryStrat = new RecoveryStrategy(cc, name);
+      recoveryStrat = new RecoveryStrategy(cc, name, this);
       recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
       recoveryStrat.start();
       recoveryRunning = true;
@@ -240,7 +242,7 @@ public final class DefaultSolrCoreState 
   @Override
   public void cancelRecovery() {
     synchronized (recoveryLock) {
-      if (recoveryStrat != null) {
+      if (recoveryStrat != null && recoveryRunning) {
         recoveryStrat.close();
         while (true) {
           try {
@@ -257,5 +259,15 @@ public final class DefaultSolrCoreState 
       }
     }
   }
+
+  @Override
+  public void recovered() {
+    recoveryRunning = false;
+  }
+
+  @Override
+  public void failed() {
+    recoveryRunning = false;
+  }
   
 }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Sep  3 19:46:47 2012
@@ -53,11 +53,6 @@ import org.slf4j.LoggerFactory;
 public class SolrCmdDistributor {
   private static final int MAX_RETRIES_ON_FORWARD = 6;
   public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-  
-  // TODO: shut this thing down
-  static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
-      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-      new DefaultSolrThreadFactory("cmdDistribExecutor"));;
 
   static final HttpClient client;
   static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
@@ -90,7 +85,7 @@ public class SolrCmdDistributor {
     ModifiableSolrParams params;
   }
   
-  public SolrCmdDistributor(int numHosts) {
+  public SolrCmdDistributor(int numHosts, ThreadPoolExecutor executor) {
     int maxPermits = Math.max(8, (numHosts - 1) * 8);
     
     // limits how many tasks can actually execute at once
@@ -98,7 +93,7 @@ public class SolrCmdDistributor {
       semaphore.setMaxPermits(maxPermits);
     }
 
-    completionService = new ExecutorCompletionService<Request>(commExecutor);
+    completionService = new ExecutorCompletionService<Request>(executor);
     pending = new HashSet<Future<Request>>();
   }
   

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Sep  3 19:46:47 2012
@@ -164,15 +164,13 @@ public class DistributedUpdateProcessor 
     }
     //this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
 
-   
-    
     cloudDesc = coreDesc.getCloudDescriptor();
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
     }
 
-    cmdDistrib = new SolrCmdDistributor(numNodes);
+    cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getCmdDistribExecutor());
   }
 
   private List<Node> setupRequest(int hash) {
@@ -851,6 +849,10 @@ public class DistributedUpdateProcessor 
 
     // forward to all replicas
     if (leaderLogic && replicas != null) {
+      if (!req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader()) {
+        log.error("Abort sending request to replicas, we are no longer leader");
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Abort sending request to replicas, we are no longer leader");
+      }
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
       params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
       params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Mon Sep  3 19:46:47 2012
@@ -164,11 +164,14 @@ public class ChaosMonkeyNothingIsSafeTes
       // have request fails
       checkShardConsistency(false, true);
       
+      long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
+      .getNumFound(); 
+      
       // ensure we have added more than 0 docs
       long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
           .getResults().getNumFound();
       
-      assertTrue(cloudClientDocs > 0);
+      assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
       
       if (VERBOSE) System.out.println("control docs:"
           + controlClient.query(new SolrQuery("*:*")).getResults()

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1380322&r1=1380321&r2=1380322&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Mon Sep  3 19:46:47 2012
@@ -21,6 +21,9 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.BaseDistributedSearchTestCase;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -37,9 +40,12 @@ import org.apache.solr.common.util.Named
 import org.apache.solr.update.SolrCmdDistributor.Node;
 import org.apache.solr.update.SolrCmdDistributor.Response;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-  
+  private ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("cmdDistribExecutor"));
   
   public SolrCmdDistributorTest() {
     fixShardCount = true;
@@ -85,7 +91,7 @@ public class SolrCmdDistributorTest exte
   public void doTest() throws Exception {
     del("*:*");
     
-    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
+    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8, executor);
     
     ModifiableSolrParams params = new ModifiableSolrParams();
     List<Node> nodes = new ArrayList<Node>();
@@ -119,7 +125,7 @@ public class SolrCmdDistributorTest exte
     nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
     
     // add another 2 docs to control and 3 to client
-    cmdDistrib = new SolrCmdDistributor(8);
+    cmdDistrib = new SolrCmdDistributor(8, executor);
     cmd.solrDoc = sdoc("id", 2);
     cmdDistrib.distribAdd(cmd, nodes, params);
     
@@ -152,7 +158,7 @@ public class SolrCmdDistributorTest exte
     DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
     dcmd.id = "2";
     
-    cmdDistrib = new SolrCmdDistributor(8);
+    cmdDistrib = new SolrCmdDistributor(8, executor);
     cmdDistrib.distribDelete(dcmd, nodes, params);
     
     cmdDistrib.distribCommit(ccmd, nodes, params);
@@ -180,7 +186,7 @@ public class SolrCmdDistributorTest exte
     
     int id = 5;
     
-    cmdDistrib = new SolrCmdDistributor(8);
+    cmdDistrib = new SolrCmdDistributor(8, executor);
     
     nodes.clear();
     int cnt = atLeast(200);