You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/18 22:33:13 UTC

svn commit: r1233073 [1/2] - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/client/solrj/embedded/ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/handler/ core/src/java/org/...

Author: markrmiller
Date: Wed Jan 18 21:33:12 2012
New Revision: 1233073

URL: http://svn.apache.org/viewvc?rev=1233073&view=rev
Log:
updates around sync phase

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/util/DefaultSolrThreadFactory.java   (with props)
Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionIntegrationTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/RecoveryZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/SolrException.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Wed Jan 18 21:33:12 2012
@@ -88,6 +88,9 @@ public class JettySolrRunner {
     this.solrHome = solrHome;
     this.stopAtShutdown = stopAtShutdown;
     server.setStopAtShutdown(stopAtShutdown);
+    if (!stopAtShutdown) {
+      server.setGracefulShutdown(0);
+    }
     System.setProperty("solr.solr.home", solrHome);
     if (System.getProperty("jetty.testMode") != null) {
       // SelectChannelConnector connector = new SelectChannelConnector();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Jan 18 21:33:12 2012
@@ -3,13 +3,15 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -55,23 +57,25 @@ public abstract class ElectionContext {
     this.leaderProps = leaderProps;
   }
   
-  abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException;
+  abstract void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException, IOException;
 }
 
-final class ShardLeaderElectionContext extends ElectionContext {
+
+
+class ShardLeaderElectionContextBase extends ElectionContext {
   
-  private final SolrZkClient zkClient;
+  protected final SolrZkClient zkClient;
   private ZkStateReader zkStateReader;
-  private String shardId;
-  private String collection;
-  private SolrCore core;
+  protected String shardId;
+  protected String collection;
+  protected LeaderElector leaderElector;
 
-  public ShardLeaderElectionContext(SolrCore core, final String shardId,
+  public ShardLeaderElectionContextBase(LeaderElector leaderElector, final String shardId,
       final String collection, final String shardZkNodeName, ZkNodeProps props, ZkStateReader zkStateReader) {
     super(shardZkNodeName, ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/leader_elect/"
         + shardId, ZkStateReader.getShardLeadersPath(collection, shardId),
         props);
-    this.core = core;
+    this.leaderElector = leaderElector;
     this.zkClient = zkStateReader.getZkClient();
     this.zkStateReader = zkStateReader;
     this.shardId = shardId;
@@ -79,23 +83,16 @@ final class ShardLeaderElectionContext e
   }
 
   @Override
-  void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
-    System.out.println("run leader process");
-    System.out.println(weAreReplacement + " " + core);
-    // TODO: move sync stuff to a better spot
-    if (weAreReplacement && core != null) { // TODO: core can be null in tests
-      if (zkClient.exists(leaderPath, true)) {
-        zkClient.delete(leaderPath, -1, true);
-      }
-      System.out.println("SYNC UP");
-      syncReplicas();
-    }
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core)
+      throws KeeperException, InterruptedException, IOException {
+
     try {
       zkClient.makePath(leaderPath,
           leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
           CreateMode.EPHEMERAL, true);
     } catch (NodeExistsException e) {
-      // if a previous leader ephemeral still exists for some reason, try and remove it
+      // if a previous leader ephemeral still exists for some reason, try and
+      // remove it
       zkClient.delete(leaderPath, -1, true);
       zkClient.makePath(leaderPath,
           leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
@@ -103,50 +100,170 @@ final class ShardLeaderElectionContext e
     }
   }
 
-  private void syncReplicas() {
+ 
+
+
+  
+
+  
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i=0; i<params.length; i+=2) {
+      msp.add(params[i], params[i+1]);
+    }
+    return msp;
+  }
+}
+
+final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
+  private ZkController zkController;
+  
+  public ShardLeaderElectionContext(LeaderElector leaderElector, 
+      final String shardId, final String collection,
+      final String shardZkNodeName, ZkNodeProps props, ZkController zkController) {
+    super(leaderElector, shardId, collection, shardZkNodeName, props,
+        zkController.getZkStateReader());
+    this.zkController = zkController;
+  }
+  
+  @Override
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core)
+      throws KeeperException, InterruptedException, IOException {
+    
+    // TODO: move sync stuff to a better spot??
+    if (weAreReplacement && core != null) { // TODO: core can be null in tests
+      if (zkClient.exists(leaderPath, true)) {
+        zkClient.delete(leaderPath, -1, true);
+      }
+      // nocommit
+      System.out.println("SYNC UP");
+      boolean success = syncReplicas(core);
+      if (!success) {
+        // remove our ephemeral and re join the election
+        System.out.println("sync failed, delete our election node:"
+            + leaderSeqPath);
+        zkController.publish(core, ZkStateReader.DOWN);
+        zkClient.delete(leaderSeqPath, -1, true);
+        
+        core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+        
+        leaderElector.joinElection(this, core);
+        return;
+      }
+      
+    }
+    
+    // If I am going to be the leader I have to be active
+    if (core != null) {
+      core.getUpdateHandler().getSolrCoreState().cancelRecovery();
+      zkController.publish(core, ZkStateReader.ACTIVE);
+    }
+    
+    super.runLeaderProcess(leaderSeqPath, weAreReplacement, core);
+  }
+  
+  private boolean syncReplicas(SolrCore core) {
+    boolean success = false;
     try {
       // nocommit
-//      System.out.println("I am the new Leader:" + leaderPath
-//          + " - I need to request all of my replicas to go into sync mode");
+      System.out.println("I may be the new Leader:" + leaderPath
+          + " - I need to request all of my replicas to go into sync mode");
       
       // first sync ourselves - we are the potential leader after all
-      boolean success = sync(leaderProps);
+      try {
+        success = sync(core, leaderProps);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      
+      // if !success but no one else is in active mode,
+      // we are the leader anyway
+      // nocommit: should we also be leader if there is only one other active?
+      // if we couldnt sync with it, it shouldnt be able to sync with us
+      if (!success && !areAnyOtherReplicasActive(leaderProps)) {
+        System.out
+            .println("wasnt a success but no on else i active! I am the leader");
+        
+        success = true;
+      }
+
       if (success) {
+        // nocommit
         System.out.println("Sync success");
         // we are the leader - tell all of our replias to sync with us
         
         // sync everyone else
         // TODO: we should do this in parallel at least
-        List<ZkCoreNodeProps> nodes = zkStateReader.getReplicaProps(collection, shardId,
-            leaderProps.get(ZkStateReader.NODE_NAME_PROP), leaderProps.get(ZkStateReader.CORE_PROP));
+        List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
+            .getReplicaProps(collection, shardId,
+                leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+                leaderProps.get(ZkStateReader.CORE_PROP), ZkStateReader.ACTIVE);
         if (nodes != null) {
           for (ZkCoreNodeProps node : nodes) {
             try {
               sync(leaderProps, node.getNodeProps());
-            } catch(Exception exception) {
+            } catch (Exception exception) {
               exception.printStackTrace();
-              //nocommit
+              // nocommit
             }
           }
         }
       } else {
         // nocommit: we cannot be the leader - go into recovery
         // but what if no one can be the leader in a loop?
+        // perhaps we look down the list and if no one is active, we
+        // accept leader role anyhow
+        
+        // nocommit
         System.out.println("Sync failure");
       }
       
-
-      
     } catch (Exception e) {
       // nocommit
       e.printStackTrace();
     }
+    
+    return success;
   }
-
-  private boolean sync(ZkNodeProps props) throws MalformedURLException, SolrServerException,
-      IOException {
-    List<ZkCoreNodeProps> nodes = zkStateReader.getReplicaProps(collection, shardId,
-        props.get(ZkStateReader.NODE_NAME_PROP), props.get(ZkStateReader.CORE_PROP));
+  
+  protected boolean areAnyOtherReplicasActive(ZkNodeProps leaderProps) {
+    CloudState cloudState = zkController.getZkStateReader().getCloudState();
+    Map<String,Slice> slices = cloudState.getSlices(this.collection);
+    Slice slice = slices.get(shardId);
+    Map<String,ZkNodeProps> shards = slice.getShards();
+    for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+      String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+      System.out.println("state:"
+          + state
+          + shard.getValue().get(ZkStateReader.NODE_NAME_PROP)
+          + " live: "
+          + cloudState.liveNodesContain(shard.getValue().get(
+              ZkStateReader.NODE_NAME_PROP)));
+      if ((state.equals(ZkStateReader.ACTIVE))
+          && cloudState.liveNodesContain(shard.getValue().get(
+              ZkStateReader.NODE_NAME_PROP))
+          && !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
+              new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
+        System.out.println(" FOUND ACTIVE");
+        return true;
+      }
+    }
+    
+    System.out.println(" DIDNT FOUND ACTIVE");
+    return false;
+  }
+  
+  private boolean sync(SolrCore core, ZkNodeProps props) throws MalformedURLException,
+      SolrServerException, IOException {
+    List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
+        .getReplicaProps(collection, shardId,
+            props.get(ZkStateReader.NODE_NAME_PROP),
+            props.get(ZkStateReader.CORE_PROP), ZkStateReader.ACTIVE); // TODO:
+                                                                       // should
+                                                                       // there
+                                                                       // be a
+                                                                       // state
+                                                                       // filter?
     
     if (nodes == null) {
       // I have no replicas
@@ -157,17 +274,19 @@ final class ShardLeaderElectionContext e
     for (ZkCoreNodeProps node : nodes) {
       syncWith.add(node.getCoreUrl());
     }
-
-    // TODO: do we first everyone register as sync phase? get the overseer to do it?
+    
+    // TODO: do we first everyone register as sync phase? get the overseer to do
+    // it?
     PeerSync peerSync = new PeerSync(core, syncWith, 1000);
     return peerSync.sync();
   }
   
   private void sync(ZkNodeProps leader, ZkNodeProps props)
       throws MalformedURLException, SolrServerException, IOException {
-    List<ZkCoreNodeProps> nodes = zkStateReader.getReplicaProps(collection,
-        shardId, props.get(ZkStateReader.NODE_NAME_PROP),
-        props.get(ZkStateReader.CORE_PROP));
+    List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
+        .getReplicaProps(collection, shardId,
+            props.get(ZkStateReader.NODE_NAME_PROP),
+            props.get(ZkStateReader.CORE_PROP));
     
     if (nodes == null) {
       // I have no replicas
@@ -194,14 +313,6 @@ final class ShardLeaderElectionContext e
       }
     }
   }
-  
-  public static ModifiableSolrParams params(String... params) {
-    ModifiableSolrParams msp = new ModifiableSolrParams();
-    for (int i=0; i<params.length; i+=2) {
-      msp.add(params[i], params[i+1]);
-    }
-    return msp;
-  }
 }
 
 final class OverseerElectionContext extends ElectionContext {
@@ -216,7 +327,7 @@ final class OverseerElectionContext exte
   }
 
   @Override
-  void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException {
+  void runLeaderProcess(String leaderSeqPath, boolean weAreReplacement, SolrCore core) throws KeeperException, InterruptedException {
     new Overseer(zkClient, stateReader);
   }
   

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Wed Jan 18 21:33:12 2012
@@ -30,6 +30,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.SolrCore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -74,6 +75,7 @@ public  class LeaderElector {
    * If it is, set the leaderId on the leader zk node. If it is not, start
    * watching the candidate that is in line before this one - if it goes down, check
    * if this candidate is the leader again.
+   * @param leaderSeqPath 
    * 
    * @param seq
    * @param context 
@@ -82,7 +84,7 @@ public  class LeaderElector {
    * @throws IOException 
    * @throws UnsupportedEncodingException
    */
-  private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
+  private void checkIfIamLeader(final String leaderSeqPath, final int seq, final ElectionContext context, boolean replacement, final SolrCore core) throws KeeperException,
       InterruptedException, IOException {
     // get all other numbers...
     final String holdElectionPath = context.electionPath + ELECTION_NODE;
@@ -91,7 +93,7 @@ public  class LeaderElector {
     sortSeqs(seqs);
     List<Integer> intSeqs = getSeqs(seqs);
     if (seq <= intSeqs.get(0)) {
-      runIamLeaderProcess(context, replacement);
+      runIamLeaderProcess(leaderSeqPath, context, replacement, core);
     } else {
       // I am not the leader - watch the node below me
       int i = 1;
@@ -115,7 +117,7 @@ public  class LeaderElector {
               public void process(WatchedEvent event) {
                 // am I the next leader?
                 try {
-                  checkIfIamLeader(seq, context, true);
+                  checkIfIamLeader(leaderSeqPath, seq, context, true, core);
                 } catch (KeeperException e) {
                   log.warn("", e);
                   
@@ -132,17 +134,19 @@ public  class LeaderElector {
       } catch (KeeperException.SessionExpiredException e) {
         throw e;
       } catch (KeeperException e) {
+        e.printStackTrace(System.out);
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
-        checkIfIamLeader(seq, context, true);
+        checkIfIamLeader(leaderSeqPath, seq, context, true, core);
       }
     }
   }
 
-  protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
-      InterruptedException {
+  // TODO: get this core param out of here
+  protected void runIamLeaderProcess(String leaderSeqPath, final ElectionContext context, boolean weAreReplacement, SolrCore core) throws KeeperException,
+      InterruptedException, IOException {
 
-    context.runLeaderProcess(weAreReplacement);
+    context.runLeaderProcess(leaderSeqPath, weAreReplacement, core);
   }
   
   /**
@@ -197,13 +201,14 @@ public  class LeaderElector {
    * watch the next lowest numbered node.
    * 
    * @param context
+   * @param SolrCore - optional - sometimes null
    * @return sequential node number
    * @throws KeeperException
    * @throws InterruptedException
    * @throws IOException 
    * @throws UnsupportedEncodingException
    */
-  public int joinElection(ElectionContext context) throws KeeperException, InterruptedException, IOException {
+  public int joinElection(ElectionContext context, SolrCore core) throws KeeperException, InterruptedException, IOException {
     final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
     
     long sessionId = zkClient.getSolrZooKeeper().getSessionId();
@@ -245,7 +250,7 @@ public  class LeaderElector {
       }
     }
     int seq = getSeq(leaderSeqPath);
-    checkIfIamLeader(seq, context, false);
+    checkIfIamLeader(leaderSeqPath, seq, context, false, core);
     
     return seq;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java Wed Jan 18 21:33:12 2012
@@ -20,8 +20,9 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
-import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
@@ -30,29 +31,34 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RecoveryStrat {
-  private static final int MAX_RETRIES = 10;
-
+public class RecoveryStrat extends Thread {
+  private static final int MAX_RETRIES = 100;
+  private static final int START_TIMEOUT = 100;
+  
   private static final String REPLICATION_HANDLER = "/replication";
 
   private static Logger log = LoggerFactory.getLogger(RecoveryStrat.class);
-  
-  private volatile RecoveryListener recoveryListener;
 
   private volatile boolean close = false;
+
+
+  private ZkController zkController;
+  private String baseUrl;
+  private String coreZkNodeName;
+  private ZkStateReader zkStateReader;
+  private volatile String coreName;
+  private int retries;
+  private SolrCore core;
   
   // for now, just for tests
   public interface RecoveryListener {
@@ -61,164 +67,28 @@ public class RecoveryStrat {
     public void finishedRecovery();
   }
   
+  public RecoveryStrat(SolrCore core) {
+    this.core = core;
+    this.coreName = core.getName();
+    setDaemon(true);
+    
+    zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+    zkStateReader = zkController.getZkStateReader();
+    baseUrl = zkController.getBaseUrl();
+    coreZkNodeName = zkController.getNodeName() + "_" + coreName;
+    
+  }
+  
   // make sure any threads stop retrying
   public void close() {
     close = true;
-  }
-  
-  // TODO: we want to be pretty noisy if we don't properly recover?
-  public void recover(final SolrCore core) {
-   
-    final ZkController zkController = core.getCoreDescriptor()
-        .getCoreContainer().getZkController();
-    final ZkStateReader zkStateReader = zkController.getZkStateReader();
-    final String baseUrl = zkController.getBaseUrl();
-    final String shardZkNodeName = zkController.getNodeName() + "_"
-        + core.getName();
-    final CloudDescriptor cloudDesc = core.getCoreDescriptor()
-        .getCloudDescriptor();
- 
-    core.getUpdateHandler().getSolrCoreState().recoveryRequests.incrementAndGet();
-    try {
-      log.info("Start recovery process");
-      if (recoveryListener != null) recoveryListener.startRecovery();
-
-
-    } catch (Exception e) {
-      log.error("", e);
-      core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
-      recoveryFailed(core, zkController, baseUrl, shardZkNodeName,
-          cloudDesc);
-      return;
-    }
+    interrupt();
+    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+    if (ulog == null) return;
+    ulog.cancelApplyBufferedUpdates();
     
-    Thread thread = new Thread() {
-      {
-        setDaemon(true);
-      }
-      
-      @Override
-      public void run() {
-        synchronized (core.getUpdateHandler().getSolrCoreState().getRecoveryLock()) {
-          
-          UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-          if (ulog == null) return;
-
-          boolean replayed = false;
-          boolean succesfulRecovery = false;
-          int retries = 0;
-          while (!succesfulRecovery && !close) {
-            if (core.getCoreDescriptor().getCoreContainer().isShutDown()) {
-              return;
-            }
-            ulog.bufferUpdates();  
-            replayed = false;
-            try {
-              zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
-                  core.getName());
-              
-              ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
-                  cloudDesc.getCollectionName(), cloudDesc.getShardId());
-              // nocommit
-              // System.out.println("recover " + shardZkNodeName + " against " + leaderprops);
-              replicate(zkController.getNodeName(), core, shardZkNodeName, leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName()));
-              
-              replay(core);
-              replayed = true;
-              
-              // if there are pending recovery requests, don't advert as active
-              if (core.getUpdateHandler().getSolrCoreState().recoveryRequests
-                  .get() == 1) {
-                zkController.publishAsActive(baseUrl, cloudDesc,
-                    shardZkNodeName, core.getName());
-              }
-              
-              if (recoveryListener != null) recoveryListener.finishedRecovery();
-
-              succesfulRecovery = true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              log.error("Recovery was interrupted", e);
-              retries = MAX_RETRIES;
-            } catch (Throwable t) {
-              log.error("Error while trying to recover", t);
-            } finally {
-              if (!replayed) {
-                try {
-                  ulog.dropBufferedUpdates();
-                } catch (Exception e) {
-                  log.error("", e);
-                }
-              }
-              if (succesfulRecovery) {
-                core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
-              }
-            }
-            
-            if (!succesfulRecovery) {
-              // lets pause for a moment and we need to try again...
-              // TODO: we don't want to retry for some problems?
-              // Or do a fall off retry...
-              try {
-              log.error("Recovery failed - trying again...");
-              retries++;
-              if (retries >= MAX_RETRIES) {
-                // TODO: for now, give up after 10 tries - should we do more?
-                recoveryFailed(core, zkController, baseUrl, shardZkNodeName,
-                    cloudDesc);
-              }
-              
-              zkController.publishAsDown(baseUrl, cloudDesc, shardZkNodeName,
-                  core.getName());
-              
-              } catch (Exception e) {
-                log.error("", e);
-              }
-              
-              try {
-                Thread.sleep(500);
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.error("Recovery was interrupted", e);
-                retries = MAX_RETRIES;
-              }
-            }
-          }
-          log.info("Finished recovery process");
-        }
-      }
-      
-      private Future<RecoveryInfo> replay(final SolrCore core)
-          throws InterruptedException, ExecutionException {
-        Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog()
-            .applyBufferedUpdates();
-        if (future == null) {
-          // no replay needed\
-          log.info("No replay needed");
-        } else {
-          // wait for replay
-          future.get();
-        }
-        
-        // nocommit
-//        try {
-//          RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
-//          SolrIndexSearcher searcher = searchHolder.get();
-//          try {
-//            System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
-//                + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
-//          } finally {
-//            searchHolder.decref();
-//          }
-//        } catch (Exception e) {
-//          
-//        }
-        
-        return future;
-      }
-    };
-    thread.start();
   }
+
   
   private void recoveryFailed(final SolrCore core,
       final ZkController zkController, final String baseUrl,
@@ -253,7 +123,7 @@ public class RecoveryStrat {
       prepCmd.setCoreNodeName(shardZkNodeName);
       
       server.request(prepCmd);
-      
+      server.shutdown();
       
       // use rep handler directly, so we can do this sync rather than async
       SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
@@ -270,14 +140,13 @@ public class RecoveryStrat {
       ModifiableSolrParams solrParams = new ModifiableSolrParams();
       solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
       
+      if (close) retries = MAX_RETRIES; 
       boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
 
       if (!success) {
         throw new RuntimeException("Replication for recovery failed.");
       }
       
-      if (recoveryListener != null) recoveryListener.finishedReplication();
-      
       // nocommit
 //      try {
 //        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
@@ -294,11 +163,118 @@ public class RecoveryStrat {
     }
   }
   
-  public RecoveryListener getRecoveryListener() {
-    return recoveryListener;
+  @Override
+  public void run() {
+    
+    boolean replayed = false;
+    boolean succesfulRecovery = false;
+    
+    while (!succesfulRecovery && !close && !isInterrupted()) {
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      if (ulog == null) return;
+      
+      ulog.bufferUpdates();
+      replayed = false;
+      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+      try {
+        zkController.publish(core, ZkStateReader.RECOVERING);
+        
+        ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
+            cloudDesc.getCollectionName(), cloudDesc.getShardId());
+        // nocommit
+        // System.out.println("recover " + shardZkNodeName + " against " +
+        // leaderprops);
+        replicate(zkController.getNodeName(), core, coreZkNodeName,
+            leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
+        
+        replay(ulog);
+        replayed = true;
+        
+        // if there are pending recovery requests, don't advert as active
+        
+        zkController.publishAsActive(baseUrl, cloudDesc, coreZkNodeName,
+            coreName);
+        
+        succesfulRecovery = true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.warn("Recovery was interrupted", e);
+        retries = MAX_RETRIES;
+      } catch (Throwable t) {
+        SolrException.log(log, "Error while trying to recover", t);
+      } finally {
+        if (!replayed) {
+          try {
+            ulog.dropBufferedUpdates();
+          } catch (Exception e) {
+            log.error("", e);
+          }
+        }
+        
+      }
+      
+      if (!succesfulRecovery) {
+        // lets pause for a moment and we need to try again...
+        // TODO: we don't want to retry for some problems?
+        // Or do a fall off retry...
+        try {
+
+          SolrException.log(log, "Recovery failed - trying again...");
+          retries++;
+          if (retries >= MAX_RETRIES) {
+            // TODO: for now, give up after 10 tries - should we do more?
+            recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+                cloudDesc);
+            break;
+          }
+          
+          zkController.publishAsDown(baseUrl, cloudDesc, coreZkNodeName,
+              core.getName());
+          
+        } catch (Exception e) {
+          SolrException.log(log, "", e);
+        }
+        
+        try {
+          Thread.sleep(Math.min(START_TIMEOUT * retries, 60000));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn("Recovery was interrupted", e);
+          retries = MAX_RETRIES;
+        }
+      }
+      
+      log.info("Finished recovery process");
+      
+    }
   }
 
-  public void setRecoveryListener(RecoveryListener recoveryListener) {
-    this.recoveryListener = recoveryListener;
+  private Future<RecoveryInfo> replay(UpdateLog ulog)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    Future<RecoveryInfo> future = ulog.applyBufferedUpdates();
+    if (future == null) {
+      // no replay needed\
+      log.info("No replay needed");
+    } else {
+      // wait for replay
+      future.get(5, TimeUnit.MINUTES); // nocommit
+    }
+    
+    // nocommit
+//    try {
+//      RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//      SolrIndexSearcher searcher = searchHolder.get();
+//      try {
+//        System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
+//            + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+//      } finally {
+//        searchHolder.decref();
+//      }
+//    } catch (Exception e) {
+//      
+//    }
+    
+    return future;
   }
+
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 18 21:33:12 2012
@@ -49,7 +49,6 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.SolrCmdDistributor.Node;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -101,8 +100,6 @@ public final class ZkController {
   private String hostName;
 
   private LeaderElector overseerElector;
-
-  private RecoveryStrat recoveryStrat = new RecoveryStrat();
   
   private boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
 
@@ -169,7 +166,7 @@ public final class ZkController {
               Overseer.createClientNodes(zkClient, getNodeName());
 
               ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
-              overseerElector.joinElection(context);
+              overseerElector.joinElection(context, null);
               zkStateReader.createClusterStateWatchersAndUpdate();
               
               List<CoreDescriptor> descriptors = registerOnReconnect
@@ -231,8 +228,6 @@ public final class ZkController {
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "", e);
     }
-    
-    recoveryStrat.close();
   }
 
   /**
@@ -332,7 +327,7 @@ public final class ZkController {
       overseerElector = new LeaderElector(zkClient);
       ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
       overseerElector.setup(context);
-      overseerElector.joinElection(context);
+      overseerElector.joinElection(context, null);
       zkStateReader.createClusterStateWatchersAndUpdate();
       
     } catch (IOException e) {
@@ -512,17 +507,26 @@ public final class ZkController {
     
     SolrCore core = null;
     
+
+    
     CoreContainer cc = desc.getCoreContainer();
     if (cc != null) { // CoreContainer only null in tests
       try {
         core = cc.getCore(desc.getName());
-        checkRecovery(coreName, desc, recoverReloadedCores, baseUrl, cloudDesc,
+        joinElection(collection, shardZkNodeName, shardId, leaderProps, core);
+        boolean success = checkRecovery(coreName, desc, recoverReloadedCores, baseUrl, cloudDesc,
             collection, shardZkNodeName, shardId, leaderProps, core, cc);
+        if (success) {
+          publishAsActive(baseUrl, cloudDesc, shardZkNodeName, coreName);
+        }
       } finally {
         if (core != null) {
           core.close();
         }
       }
+    } else {
+      joinElection(collection, shardZkNodeName, shardId, leaderProps, core);
+      publishAsActive(baseUrl, cloudDesc, shardZkNodeName, coreName);
     }
     
     // make sure we have an update cluster state right away
@@ -532,71 +536,69 @@ public final class ZkController {
   }
 
 
-  private void checkRecovery(String coreName, final CoreDescriptor desc,
+  private void joinElection(final String collection,
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+      SolrCore core) throws InterruptedException, KeeperException, IOException {
+    ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
+        collection, shardZkNodeName, leaderProps, this);
+    
+    leaderElector.setup(context);
+    leaderElector.joinElection(context, core);
+  }
+
+
+  private boolean checkRecovery(String coreName, final CoreDescriptor desc,
       boolean recoverReloadedCores, final String baseUrl,
       final CloudDescriptor cloudDesc, final String collection,
       final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
       SolrCore core, CoreContainer cc) throws InterruptedException,
       KeeperException, IOException, ExecutionException {
-    try {
-      ElectionContext context = new ShardLeaderElectionContext(core, shardId,
-          collection, shardZkNodeName, leaderProps, zkStateReader);
-      
-      leaderElector.setup(context);
-      leaderElector.joinElection(context);
+    
+    String leaderUrl = zkStateReader.getLeaderUrl(collection,
+        cloudDesc.getShardId(), 30000);
+    
+    boolean doRecovery = true;
+    String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+    log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+    if (leaderUrl.equals(ourUrl)) {
+      doRecovery = false;
       
-      String leaderUrl = zkStateReader.getLeaderUrl(collection,
-          cloudDesc.getShardId(), 15000);
+      // recover from local transaction log and wait for it to complete before
+      // going active
+      // TODO: should this be moved to another thread? To recoveryStrat?
+      // TODO: should this actually be done earlier, before (or as part of)
+      // leader election perhaps?
+      // TODO: ensure that a replica that is trying to recover waits until I'm
+      // active (or don't make me the
+      // leader until my local replay is done. But this replay is only needed
+      // on the leader - replicas
+      // will do recovery anyway
       
-      boolean doRecovery = true;
-      String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-      log.info("We are " + ourUrl + " and leader is " + leaderUrl);
-      if (leaderUrl.equals(ourUrl)) {
-        doRecovery = false;
-        
-        // recover from local transaction log and wait for it to complete before
-        // going active
-        // TODO: should this be moved to another thread? To recoveryStrat?
-        // TODO: should this actually be done earlier, before (or as part of)
-        // leader election perhaps?
-        // TODO: ensure that a replica that is trying to recover waits until I'm
-        // active (or don't make me the
-        // leader until my local replay is done. But this replay is only needed
-        // on the leader - replicas
-        // will do recovery anyway
-        
-        core = cc.getCore(desc.getName());
-        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        if (!core.isReloaded() && ulog != null) {
-          Future<UpdateLog.RecoveryInfo> recoveryFuture = core
-              .getUpdateHandler().getUpdateLog().recoverFromLog();
-          if (recoveryFuture != null) {
-            recoveryFuture.get(); // NOTE: this could potentially block for
-                                  // minutes or more!
-            // TODO: public as recovering in the mean time?
-          }
-        }
-        
-        // publish new props
-        publishAsActive(baseUrl, cloudDesc, shardZkNodeName, coreName);
-      } else {
-        
-        core = cc.getCore(desc.getName());
-        
-        if (core.isReloaded() && !recoverReloadedCores) {
-          doRecovery = false;
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      if (!core.isReloaded() && ulog != null) {
+        Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+            .getUpdateLog().recoverFromLog();
+        if (recoveryFuture != null) {
+          recoveryFuture.get(); // NOTE: this could potentially block for
+                                // minutes or more!
+          // TODO: public as recovering in the mean time?
         }
       }
+      return false;
+    } else {
       
-      if (doRecovery && !SKIP_AUTO_RECOVERY) {
-        log.info("Core needs to recover:" + core.getName());
-        recoveryStrat.recover(core);
-      }
-    } finally {
-      if (core != null) {
-        core.close();
+      if (core.isReloaded() && !recoverReloadedCores) {
+        doRecovery = false;
       }
     }
+    
+    if (doRecovery && !SKIP_AUTO_RECOVERY) {
+      log.info("Core needs to recover:" + core.getName());
+      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+      return true;
+    }
+    
+    return false;
   }
 
 
@@ -616,16 +618,17 @@ public final class ZkController {
     finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
     publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
   }
-  
-  void publishAsRecoverying(String baseUrl,
-      final CloudDescriptor cloudDesc, String shardZkNodeName, String coreName) {
+
+  public void publish(SolrCore core, String state) {
+    CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
     Map<String,String> finalProps = new HashMap<String,String>();
-    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
-    finalProps.put(ZkStateReader.CORE_PROP, coreName);
+    finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+    finalProps.put(ZkStateReader.CORE_PROP, core.getName());
     finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+    finalProps.put(ZkStateReader.STATE_PROP, state);
     finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
-    publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
+    publishState(cloudDesc, getNodeName() + "_" + core.getName(),
+        core.getName(), finalProps);
   }
   
   void publishAsDown(String baseUrl,
@@ -861,10 +864,6 @@ public final class ZkController {
     }
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get shard_id for core: " + coreName);
   }
-
-  public RecoveryStrat getRecoveryStrat() {
-    return recoveryStrat;
-  }
   
   public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
     File[] files = dir.listFiles();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java Wed Jan 18 21:33:12 2012
@@ -39,6 +39,7 @@ import org.apache.solr.cloud.SolrZkServe
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkSolrResourceLoader;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.DOMUtil;
@@ -433,6 +434,7 @@ public class CoreContainer 
     synchronized(cores) {
       try {
         for(SolrCore core : cores.values()) {
+          core.getUpdateHandler().getSolrCoreState().cancelRecovery();
           if (!core.isClosed()) {
             core.close();
           }
@@ -515,6 +517,10 @@ public class CoreContainer 
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);
       } catch (Exception e) {
+        // if register fails, this is really bad - close the zkController to
+        // minimize any damage we can cause
+        zkController.publish(core, ZkStateReader.DOWN);
+        zkController.close();
         log.error("", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Wed Jan 18 21:33:12 2012
@@ -43,6 +43,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.IndexReader;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -286,7 +287,7 @@ public class ReplicationHandler extends 
       }
       return tempSnapPuller.fetchLatestIndex(core, force);
     } catch (Exception e) {
-      LOG.error("SnapPull failed ", e);
+      SolrException.log(LOG, "SnapPull failed ", e);
     } finally {
       tempSnapPuller = snapPuller;
       snapPullLock.unlock();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Jan 18 21:33:12 2012
@@ -595,15 +595,14 @@ public class CoreAdminHandler extends Re
     if (cname == null) {
       cname = "";
     }
-    
     SolrCore core = coreContainer.getCore(cname);
     try {
-      RecoveryStrat recoveryStrat = new RecoveryStrat();
-      recoveryStrat.recover(core);
+      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
     } finally {
       // no recoveryStrat close for now
       core.close();
     }
+    
   }
   
   protected void handlePrepRecoveryAction(SolrQueryRequest req,

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Jan 18 21:33:12 2012
@@ -347,7 +347,7 @@ public class HttpShardHandler extends Sh
             for (ZkNodeProps nodeProps : sliceShards.values()) {
               ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
               if (!liveNodes.contains(coreNodeProps.getNodeName())
-                  && coreNodeProps.getState().equals(
+                  || !coreNodeProps.getState().equals(
                       ZkStateReader.ACTIVE)) continue;
               if (first) {
                 first = false;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Wed Jan 18 21:33:12 2012
@@ -16,6 +16,13 @@ package org.apache.solr.handler.componen
  * limitations under the License.
  */
 
+import java.net.MalformedURLException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
@@ -23,17 +30,11 @@ import org.apache.commons.httpclient.par
 import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.core.PluginInfo;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.MalformedURLException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 
 public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized{
   protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
@@ -48,8 +49,9 @@ public class HttpShardHandlerFactory ext
           0,
           Integer.MAX_VALUE,
           5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
-          new SynchronousQueue<Runnable>()  // directly hand off tasks
-  );
+          new SynchronousQueue<Runnable>(),  // directly hand off tasks
+          new DefaultSolrThreadFactory("httpShardExecutor")
+   );
 
 
   HttpClient client;
@@ -58,6 +60,8 @@ public class HttpShardHandlerFactory ext
   int soTimeout = 0; //current default values
   int connectionTimeout = 0; //current default values
   public  String scheme = "http://"; //current default values
+
+  private MultiThreadedHttpConnectionManager mgr;
  // socket timeout measured in ms, closes a socket if read
   // takes longer than x ms to complete. throws
   // java.net.SocketTimeoutException: Read timed out exception
@@ -97,7 +101,7 @@ public class HttpShardHandlerFactory ext
           log.info("Setting shard-connection-timeout to: " + connectionTimeout);
         }
     }
-    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+    mgr = new MultiThreadedHttpConnectionManager();
     mgr.getParams().setDefaultMaxConnectionsPerHost(20);
     mgr.getParams().setMaxTotalConnections(10000);
     mgr.getParams().setConnectionTimeout(connectionTimeout);
@@ -118,4 +122,10 @@ public class HttpShardHandlerFactory ext
     }
 
   }
+
+  @Override
+  public void close() {
+    mgr.shutdown();
+    loadbalancer.shutdown();
+  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Jan 18 21:33:12 2012
@@ -319,7 +319,7 @@ public class QueryComponent extends Sear
             for (ZkNodeProps nodeProps : sliceShards.values()) {
               ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
               if (!liveNodes.contains(coreNodeProps.getNodeName())
-                  && coreNodeProps.getState().equals(
+                  || !coreNodeProps.getState().equals(
                       ZkStateReader.ACTIVE)) continue;
               if (first) {
                 first = false;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Jan 18 21:33:12 2012
@@ -23,6 +23,7 @@ import org.apache.solr.common.params.Com
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.util.RTimer;
+import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.RequestHandlerBase;
@@ -137,6 +138,15 @@ public class SearchHandler extends Reque
     } else {
       shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
     }
+    core.addCloseHook(new CloseHook() {
+      @Override
+      public void preClose(SolrCore core) {
+        shardHandlerFactory.close();
+      }
+      @Override
+      public void postClose(SolrCore core) {
+      }
+    });
   }
 
   public List<SearchComponent> getComponents() {

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java Wed Jan 18 21:33:12 2012
@@ -20,4 +20,6 @@ package org.apache.solr.handler.componen
 public abstract class ShardHandlerFactory {
 
   public abstract ShardHandler getShardHandler();
+
+  public abstract void close();
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/request/SimpleFacets.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/request/SimpleFacets.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/request/SimpleFacets.java Wed Jan 18 21:33:12 2012
@@ -39,6 +39,7 @@ import org.apache.solr.schema.*;
 import org.apache.solr.search.*;
 import org.apache.solr.util.BoundedTreeSet;
 import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.handler.component.ResponseBuilder;
 import org.apache.solr.util.LongPriorityQueue;
 
@@ -329,6 +330,7 @@ public class SimpleFacets {
           Integer.MAX_VALUE,
           10, TimeUnit.SECONDS, // terminate idle threads after 10 sec
           new SynchronousQueue<Runnable>()  // directly hand off tasks
+          , new DefaultSolrThreadFactory("facetExectutor")
   );
   
   /**

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Wed Jan 18 21:33:12 2012
@@ -20,14 +20,23 @@ package org.apache.solr.update;
 import java.io.IOException;
 
 import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.cloud.RecoveryStrat;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
 
 public final class DefaultSolrCoreState extends SolrCoreState {
+ 
+  private final Object recoveryLock = new Object();
   private int refCnt = 1;
   private SolrIndexWriter indexWriter = null;
   private DirectoryFactory directoryFactory;
+ 
 
+ 
+
+  private boolean recoveryRunning;
+  private RecoveryStrat recoveryStrat;
+  
   public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
     this.directoryFactory = directoryFactory;
   }
@@ -87,5 +96,40 @@ public final class DefaultSolrCoreState 
   public DirectoryFactory getDirectoryFactory() {
     return directoryFactory;
   }
+
+  @Override
+  public void doRecovery(SolrCore core) {
+    cancelRecovery();
+    synchronized (recoveryLock) {
+      while (recoveryRunning) {
+        try {
+          recoveryLock.wait(1000);
+        } catch (InterruptedException e) {
+
+        }
+      }
+      
+      recoveryStrat = new RecoveryStrat(core);
+      recoveryStrat.start();
+      recoveryRunning = true;
+    }
+    
+  }
+  
+  @Override
+  public void cancelRecovery() {
+    synchronized (recoveryLock) {
+      if (recoveryStrat != null) {
+        recoveryStrat.close();
+        try {
+          recoveryStrat.join();
+        } catch (InterruptedException e) {
+          
+        }
+        recoveryRunning = false;
+        recoveryLock.notifyAll();
+      }
+    }
+  }
   
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Wed Jan 18 21:33:12 2012
@@ -97,7 +97,7 @@ public class DirectUpdateHandler2 extend
   public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) throws IOException {
     super(core);
     if (updateHandler instanceof DirectUpdateHandler2) {
-      this.solrCoreState = ((DirectUpdateHandler2)updateHandler).solrCoreState;
+      this.solrCoreState = ((DirectUpdateHandler2) updateHandler).solrCoreState;
     } else {
       // the impl has changed, so we cannot use the old state - decref it
       updateHandler.decref();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed Jan 18 21:33:12 2012
@@ -43,14 +43,18 @@ import org.apache.solr.common.cloud.ZkCo
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
 
 
 
 public class SolrCmdDistributor {
   // TODO: shut this thing down
+  // TODO: this cannot be per instance...
   static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
-      Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
-  
+      Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("cmdDistribExecutor"));
+
   static HttpClient client;
   
   static {
@@ -64,7 +68,7 @@ public class SolrCmdDistributor {
   Set<Future<Request>> pending;
   
   int maxBufferedAddsPerServer = 10;
-  int maxBufferedDeletesPerServer = 100;
+  int maxBufferedDeletesPerServer = 10;
 
   private Response response = new Response();
   
@@ -81,6 +85,10 @@ public class SolrCmdDistributor {
     ModifiableSolrParams params;
   }
   
+  public SolrCmdDistributor() {
+   
+  }
+  
   public void finish() {
 
     // piggyback on any outstanding adds or deletes if possible.

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Wed Jan 18 21:33:12 2012
@@ -18,7 +18,6 @@ package org.apache.solr.update;
  */
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.index.IndexWriter;
 import org.apache.solr.core.DirectoryFactory;
@@ -31,11 +30,6 @@ import org.apache.solr.core.SolrCore;
  */
 public abstract class SolrCoreState {
   
-  // need a per core lock over reloads...
-  private final Object recoveryLock = new Object();
-  
-  public final AtomicInteger recoveryRequests = new AtomicInteger();
-  
   /**
    * Force the creation of a new IndexWriter using the settings from the given
    * SolrCore.
@@ -86,8 +80,8 @@ public abstract class SolrCoreState {
     public void closeWriter(IndexWriter writer) throws IOException;
   }
 
+  public abstract void doRecovery(SolrCore core);
+  
+  public abstract void cancelRecovery();
 
-  public Object getRecoveryLock() {
-    return recoveryLock;
-  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Wed Jan 18 21:33:12 2012
@@ -17,6 +17,24 @@
 
 package org.apache.solr.update;
 
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -31,16 +49,11 @@ import org.apache.solr.update.processor.
 import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
 import org.apache.solr.update.processor.RunUpdateProcessorFactory;
 import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-
 /** @lucene.experimental */
 public class UpdateLog implements PluginInfoInitialized {
   public static Logger log = LoggerFactory.getLogger(UpdateLog.class);
@@ -104,6 +117,7 @@ public class UpdateLog implements Plugin
   private SyncLevel defaultSyncLevel = SyncLevel.FLUSH;
 
   private volatile UpdateHandler uhandler;    // a core reload can change this reference!
+  private volatile boolean cancelApplyBufferUpdate;
 
 
   public static class LogPtr {
@@ -765,6 +779,7 @@ public class UpdateLog implements Plugin
     // reading state and acting on it in the update processor
     versionInfo.blockUpdates();
     try {
+      cancelApplyBufferUpdate = false;
       if (state != State.BUFFERING) return null;
 
       // handle case when no log was even created because no updates
@@ -845,7 +860,7 @@ public class UpdateLog implements Plugin
 
         for(;;) {
           Object o = null;
-
+          if (cancelApplyBufferUpdate) break;
           try {
             if (testing_logReplayHook != null) testing_logReplayHook.run();
             o = null;
@@ -1000,9 +1015,14 @@ public class UpdateLog implements Plugin
       if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
     }
   }
+  
+  public void cancelApplyBufferedUpdates() {
+    this.cancelApplyBufferUpdate = true;
+  }
 
-  ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-      1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+  ThreadPoolExecutor recoveryExecutor = new ThreadPoolExecutor(0,
+      Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("recoveryExecutor"));
 
 }
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Jan 18 21:33:12 2012
@@ -629,7 +629,7 @@ public class DistributedUpdateProcessor 
         String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
             .getZkController().getNodeName();
         String shardZkNodeName = nodeName + "_" + req.getCore().getName();
-        List<Node> nodes = getReplicaUrls(req, req.getCore().getCoreDescriptor()
+        List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
             .getCloudDescriptor().getCollectionName(), shardZkNodeName);
 
         if (nodes != null) {
@@ -672,7 +672,7 @@ public class DistributedUpdateProcessor 
         String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
             .getZkController().getNodeName();
         String shardZkNodeName = nodeName + "_" + req.getCore().getName();
-        List<Node> nodes = getReplicaUrls(req, req.getCore().getCoreDescriptor()
+        List<Node> nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
             .getCloudDescriptor().getCollectionName(), shardZkNodeName);
 
         if (nodes != null) {
@@ -692,7 +692,7 @@ public class DistributedUpdateProcessor 
  
 
   
-  private List<Node> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
+  private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
     CloudState cloudState = req.getCore().getCoreDescriptor()
         .getCoreContainer().getZkController().getCloudState();
     List<Node> urls = new ArrayList<Node>();

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/util/DefaultSolrThreadFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/util/DefaultSolrThreadFactory.java?rev=1233073&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/util/DefaultSolrThreadFactory.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/util/DefaultSolrThreadFactory.java Wed Jan 18 21:33:12 2012
@@ -0,0 +1,49 @@
+package org.apache.solr.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DefaultSolrThreadFactory implements ThreadFactory {
+    private static final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String prefix;
+
+    public DefaultSolrThreadFactory(String namePrefix) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null)? s.getThreadGroup() :
+                             Thread.currentThread().getThreadGroup();
+        prefix = namePrefix + "-" +
+                      poolNumber.getAndIncrement() +
+                     "-thread-";
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r,
+                              prefix + threadNumber.getAndIncrement(),
+                              0);
+
+        t.setDaemon(false);
+        
+        if (t.getPriority() != Thread.NORM_PRIORITY)
+            t.setPriority(Thread.NORM_PRIORITY);
+        return t;
+    }
+}
\ No newline at end of file

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Wed Jan 18 21:33:12 2012
@@ -291,7 +291,7 @@ public class ChaosMonkey {
     
     int chance = random.nextInt(10);
     JettySolrRunner jetty;
-    if (chance <= 8 && aggressivelyKillLeaders) {
+    if (chance <= 5 && aggressivelyKillLeaders) {
       // if killLeader, really aggressively go after leaders
       Collection<CloudJettyRunner> leaders = shardToLeaderJetty.values();
       List<CloudJettyRunner> leadersList = new ArrayList<CloudJettyRunner>(leaders.size());
@@ -331,7 +331,7 @@ public class ChaosMonkey {
   
   // synchronously starts and stops shards randomly, unless there is only one
   // active shard up for a slice or if there is one active and others recovering
-  public void startTheMonkey(boolean killLeaders) {
+  public void startTheMonkey(boolean killLeaders, final int roundPause) {
     this.aggressivelyKillLeaders = killLeaders;
     
     // TODO: when kill leaders is on, lets kill a higher percentage of leaders
@@ -344,7 +344,7 @@ public class ChaosMonkey {
       public void run() {
         while (!stop) {
           try {
-            Thread.sleep(300);
+            Thread.sleep(roundPause);
  
             if (random.nextBoolean()) {
              if (!deadPool.isEmpty()) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Wed Jan 18 21:33:12 2012
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
@@ -37,6 +39,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 
 public class ChaosMonkeyNothingIsSafeTest extends FullSolrCloudTest {
+
   
   @BeforeClass
   public static void beforeSuperClass() throws Exception {
@@ -53,14 +56,13 @@ public class ChaosMonkeyNothingIsSafeTes
   public void setUp() throws Exception {
     super.setUp();
     // TODO use @Noisy annotation as we expect lots of exceptions
-    
+    ignoreException(".*");
     System.setProperty("numShards", Integer.toString(sliceCount));
   }
   
   @Override
   @After
   public void tearDown() throws Exception {
-    printLayout();
     super.tearDown();
     resetExceptionIgnores();
   }
@@ -96,11 +98,12 @@ public class ChaosMonkeyNothingIsSafeTes
     threads.add(ftIndexThread);
     ftIndexThread.start();
     
-    chaosMonkey.startTheMonkey(true);
-    
-    Thread.sleep(atLeast(20000));
-    
-    chaosMonkey.stopTheMonkey();
+    chaosMonkey.startTheMonkey(true, 1500);
+    try {
+      Thread.sleep(atLeast(12000));
+    } finally {
+      chaosMonkey.stopTheMonkey();
+    }
     
     for (StopableIndexingThread indexThread : threads) {
       indexThread.safeStop();
@@ -184,17 +187,26 @@ public class ChaosMonkeyNothingIsSafeTes
   }
 
   class FullThrottleStopableIndexingThread extends StopableIndexingThread {
+    MultiThreadedHttpConnectionManager cm = new MultiThreadedHttpConnectionManager();
+    private HttpClient httpClient = new HttpClient(cm) ;
     private volatile boolean stop = false;
     int clientIndex = 0;
     private StreamingUpdateSolrServer suss;
     private List<SolrServer> clients;  
     
-    public FullThrottleStopableIndexingThread(List<SolrServer> clients, int startI, boolean doDeletes) throws MalformedURLException {
+    public FullThrottleStopableIndexingThread(List<SolrServer> clients,
+        int startI, boolean doDeletes) throws MalformedURLException {
       super(startI, doDeletes);
       setName("FullThrottleStopableIndexingThread");
       setDaemon(true);
       this.clients = clients;
-      suss = new StreamingUpdateSolrServer(((CommonsHttpSolrServer) clients.get(0)).getBaseURL(), 2, 2);
+      suss = new StreamingUpdateSolrServer(
+          ((CommonsHttpSolrServer) clients.get(0)).getBaseURL(), httpClient, 8,
+          2) {
+        public void handleError(Throwable ex) {
+          log.warn("suss error", ex);
+        }
+      };
     }
     
     @Override
@@ -213,14 +225,16 @@ public class ChaosMonkeyNothingIsSafeTes
             suss.deleteById(Integer.toString(delete));
           } catch (Exception e) {
             changeUrlOnError(e);
-            System.err.println("REQUEST FAILED:");
-            e.printStackTrace();
+            //System.err.println("REQUEST FAILED:");
+            //e.printStackTrace();
             fails.incrementAndGet();
           }
         }
         
         try {
           numAdds++;
+          if (numAdds > 4000)
+            continue;
           SolrInputDocument doc = getDoc(
               id,
               i,
@@ -233,8 +247,8 @@ public class ChaosMonkeyNothingIsSafeTes
           suss.add(doc);
         } catch (Exception e) {
           changeUrlOnError(e);
-          System.err.println("REQUEST FAILED:");
-          e.printStackTrace();
+          //System.err.println("REQUEST FAILED:");
+          //e.printStackTrace();
           fails.incrementAndGet();
         }
         
@@ -255,7 +269,13 @@ public class ChaosMonkeyNothingIsSafeTes
         }
         try {
           suss.shutdownNow();
-          suss = new StreamingUpdateSolrServer(((CommonsHttpSolrServer) clients.get(clientIndex)).getBaseURL(), 30, 3);
+          suss = new StreamingUpdateSolrServer(
+              ((CommonsHttpSolrServer) clients.get(clientIndex)).getBaseURL(),
+              httpClient, 30, 3) {
+            public void handleError(Throwable ex) {
+              log.warn("suss error", ex);
+            }
+          };
         } catch (MalformedURLException e1) {
           e1.printStackTrace();
         }
@@ -265,6 +285,7 @@ public class ChaosMonkeyNothingIsSafeTes
     public void safeStop() {
       stop = true;
       suss.shutdownNow();
+      cm.shutdown();
     }
 
     public int getFails() {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Wed Jan 18 21:33:12 2012
@@ -48,13 +48,7 @@ public class ChaosMonkeySafeLeaderTest e
   public void setUp() throws Exception {
     super.setUp();
     // we expect this time of exception as shards go up and down...
-    ignoreException("shard update error ");
-    ignoreException("Connection refused");
-    ignoreException("interrupted waiting for shard update response");
-    ignoreException("org\\.mortbay\\.jetty\\.EofException");
-    ignoreException("java\\.lang\\.InterruptedException");
-    ignoreException("java\\.nio\\.channels\\.ClosedByInterruptException");
-    
+    ignoreException(".*");
     
     // sometimes we cannot get the same port
     ignoreException("java\\.net\\.BindException: Address already in use");
@@ -94,7 +88,7 @@ public class ChaosMonkeySafeLeaderTest e
       indexThread.start();
     }
     
-    chaosMonkey.startTheMonkey(false);
+    chaosMonkey.startTheMonkey(false, 500);
     
     Thread.sleep(atLeast(8000));
     
@@ -118,7 +112,7 @@ public class ChaosMonkeySafeLeaderTest e
     // wait until there are no recoveries...
     waitForThingsToLevelOut();
 
-    checkShardConsistency();
+    checkShardConsistency(true, true);
     
     if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java?rev=1233073&r1=1233072&r2=1233073&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java Wed Jan 18 21:33:12 2012
@@ -64,8 +64,9 @@ public class FullSolrCloudDistribCmdsTes
     handle.put("QTime", SKIPVAL);
     handle.put("timestamp", SKIPVAL);
     
-    waitForRecoveriesToFinish(VERBOSE);
+    waitForRecoveriesToFinish(true); // nocommit verbose
     
+    printLayout();
     // add a doc, update it, and delete it
     
     QueryResponse results;
@@ -152,6 +153,10 @@ public class FullSolrCloudDistribCmdsTes
           // expected
           fails++;
           break;
+        } catch (SolrServerException e) {
+          // expected
+          fails++;
+          break;
         }
       }
     } finally {