You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/13 16:38:02 UTC

svn commit: r1231134 - in /lucene/dev/branches/solrcloud/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/handler/admin/ core/src/java/org/apache/solr/update/ core/src/test/org/apache/so...

Author: markrmiller
Date: Fri Jan 13 15:38:01 2012
New Revision: 1231134

URL: http://svn.apache.org/viewvc?rev=1231134&view=rev
Log:
harden the hell out of recovery

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/branches/solrcloud/solr/testlogging.properties

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java Fri Jan 13 15:38:01 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
@@ -35,8 +36,10 @@ import org.apache.solr.core.RequestHandl
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,8 +83,7 @@ public class RecoveryStrat {
       log.info("Start recovery process");
       if (recoveryListener != null) recoveryListener.startRecovery();
 
-      zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
-          core.getName());
+
     } catch (Exception e) {
       log.error("", e);
       core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
@@ -102,16 +104,20 @@ public class RecoveryStrat {
           UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
           if (ulog == null) return;
 
-          ulog.bufferUpdates();  
           boolean replayed = false;
           boolean succesfulRecovery = false;
           int retries = 0;
           while (!succesfulRecovery && !close) {
+            ulog.bufferUpdates();  
+            zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
+                core.getName());
+            replayed = false;
             try {
               ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
                   cloudDesc.getCollectionName(), cloudDesc.getShardId());
-              
-              replicate(core, shardZkNodeName, leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName()));
+              // nocommit
+              // System.out.println("recover " + shardZkNodeName + " against " + leaderprops);
+              replicate(zkController.getNodeName(), core, shardZkNodeName, leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, core.getName()));
               
               replay(core);
               replayed = true;
@@ -150,6 +156,7 @@ public class RecoveryStrat {
               // lets pause for a moment and we need to try again...
               // TODO: we don't want to retry for some problems?
               // Or do a fall off retry...
+              try {
               log.error("Recovery failed - trying again...");
               retries++;
               if (retries >= MAX_RETRIES) {
@@ -158,11 +165,19 @@ public class RecoveryStrat {
                     cloudDesc);
               }
               
+              zkController.publishAsDown(baseUrl, cloudDesc, shardZkNodeName,
+                  core.getName());
+              
+              } catch (Exception e) {
+                log.error("", e);
+              }
+              
               try {
                 Thread.sleep(500);
               } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
+                log.error("Recovery was interrupted", e);
+                retries = MAX_RETRIES;
               }
             }
           }
@@ -181,6 +196,21 @@ public class RecoveryStrat {
           // wait for replay
           future.get();
         }
+        
+        // nocommit
+//        try {
+//          RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//          SolrIndexSearcher searcher = searchHolder.get();
+//          try {
+//            System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
+//                + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+//          } finally {
+//            searchHolder.decref();
+//          }
+//        } catch (Exception e) {
+//          
+//        }
+        
         return future;
       }
     };
@@ -196,7 +226,7 @@ public class RecoveryStrat {
     close = true;
   }
   
-  private void replicate(SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+  private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
       throws SolrServerException, IOException {
     // start buffer updates to tran log
     // and do recovery - either replay via realtime get (eventually)
@@ -217,7 +247,8 @@ public class RecoveryStrat {
       PrepRecovery prepCmd = new PrepRecovery();
       prepCmd.setAction(CoreAdminAction.PREPRECOVERY);
       prepCmd.setCoreName(leaderCoreName);
-      prepCmd.setNodeName(shardZkNodeName);
+      prepCmd.setNodeName(nodeName);
+      prepCmd.setCoreNodeName(shardZkNodeName);
       
       server.request(prepCmd);
       
@@ -237,9 +268,27 @@ public class RecoveryStrat {
       ModifiableSolrParams solrParams = new ModifiableSolrParams();
       solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
       
-      replicationHandler.doFetch(solrParams);
+      boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
+
+      if (!success) {
+        throw new RuntimeException("Replication for recovery failed.");
+      }
       
       if (recoveryListener != null) recoveryListener.finishedReplication();
+      
+      // nocommit
+      try {
+        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+        SolrIndexSearcher searcher = searchHolder.get();
+        try {
+          System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replicated "
+              + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " from " + leaderUrl + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+        } finally {
+          searchHolder.decref();
+        }
+      } catch (Exception e) {
+        
+      }
     }
   }
   

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jan 13 15:38:01 2012
@@ -170,10 +170,24 @@ public final class ZkController {
               overseerElector.joinElection(context);
               zkStateReader.createClusterStateWatchersAndUpdate();
               
-              // re register all descriptors
               List<CoreDescriptor> descriptors = registerOnReconnect
                   .getCurrentDescriptors();
               if (descriptors != null) {
+                // before registering as live, make sure everyone is in a
+                // recovery state
+                for (CoreDescriptor descriptor : descriptors) {
+                  final String shardZkNodeName = getNodeName() + "_"
+                      + descriptor.getName();
+                  publishAsDown(getBaseUrl(), descriptor.getCloudDescriptor(), shardZkNodeName,
+                      descriptor.getName());
+                }
+              }
+              
+              // we have to register as live first to pick up docs in the buffer
+              createEphemeralLiveNode();
+              
+              // re register all descriptors
+              if (descriptors != null) {
                 for (CoreDescriptor descriptor : descriptors) {
                   // TODO: we need to think carefully about what happens when it was
                   // a leader that was expired - as well as what to do about leaders/overseers
@@ -181,10 +195,7 @@ public final class ZkController {
                   register(descriptor.getName(), descriptor, true);
                 }
               }
-              
-              // don't advertise as live until everyone has registered
-              createEphemeralLiveNode();
-
+  
             } catch (InterruptedException e) {
               // Restore the interrupted status
               Thread.currentThread().interrupt();
@@ -465,7 +476,7 @@ public final class ZkController {
     props.put(ZkStateReader.CORE_PROP, coreName);
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
     props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
-    props.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+    props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
     if(shardId!=null) {
       props.put(ZkStateReader.SHARD_ID_PROP, shardId);
     }
@@ -595,13 +606,24 @@ public final class ZkController {
     publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
   }
   
+  void publishAsDown(String baseUrl,
+      final CloudDescriptor cloudDesc, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    finalProps.put(ZkStateReader.CORE_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+    finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
+    publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
+  }
+  
   void publishAsRecoveryFailed(String baseUrl,
       final CloudDescriptor cloudDesc, String shardZkNodeName, String coreName) {
     Map<String,String> finalProps = new HashMap<String,String>();
     finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
     finalProps.put(ZkStateReader.CORE_PROP, coreName);
     finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
     finalProps.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
     publishState(cloudDesc, shardZkNodeName, coreName, finalProps);
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Fri Jan 13 15:38:01 2012
@@ -129,6 +129,8 @@ public class ReplicationHandler extends 
     // It gives the current 'replicateable' index version
     if (command.equals(CMD_INDEX_VERSION)) {
       IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
+      // nocommit
+      //System.out.println("The latest index gen is:" + commitPoint.getGeneration() + " " + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
       if (commitPoint != null && replicationEnabled.get()) {
         //
         // There is a race condition here.  The commit point may be changed / deleted by the time
@@ -163,7 +165,7 @@ public class ReplicationHandler extends 
       new Thread() {
         @Override
         public void run() {
-          doFetch(paramsCopy);
+          doFetch(paramsCopy, false);
         }
       }.start();
       rsp.add(STATUS, OK_STATUS);
@@ -271,10 +273,10 @@ public class ReplicationHandler extends 
 
   private volatile SnapPuller tempSnapPuller;
 
-  public void doFetch(SolrParams solrParams) {
+  public boolean doFetch(SolrParams solrParams, boolean force) {
     String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
     if (!snapPullLock.tryLock())
-      return;
+      return false;
     try {
       tempSnapPuller = snapPuller;
       if (masterUrl != null) {
@@ -282,13 +284,14 @@ public class ReplicationHandler extends 
         nl.remove(SnapPuller.POLL_INTERVAL);
         tempSnapPuller = new SnapPuller(nl, this, core);
       }
-      tempSnapPuller.fetchLatestIndex(core);
+      return tempSnapPuller.fetchLatestIndex(core, force);
     } catch (Exception e) {
       LOG.error("SnapPull failed ", e);
     } finally {
       tempSnapPuller = snapPuller;
       snapPullLock.unlock();
     }
+    return false;
   }
 
   boolean isReplicating() {
@@ -335,6 +338,8 @@ public class ReplicationHandler extends 
     }
     long version = Long.parseLong(v);
     IndexCommit commit = core.getDeletionPolicy().getCommitPoint(version);
+    //nocommit
+    //System.out.println("ask for files for gen:" + commit.getGeneration() + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
     if (commit == null) {
       rsp.add("status", "invalid indexversion");
       return;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Fri Jan 13 15:38:01 2012
@@ -159,7 +159,7 @@ public class SnapPuller {
         }
         try {
           executorStartTime = System.currentTimeMillis();
-          replicationHandler.doFetch(null);
+          replicationHandler.doFetch(null, false);
         } catch (Exception e) {
           LOG.error("Exception in fetching index", e);
         }
@@ -244,7 +244,8 @@ public class SnapPuller {
   @SuppressWarnings("unchecked")
   boolean successfulInstall = false;
 
-  boolean fetchLatestIndex(SolrCore core) throws IOException {
+  boolean fetchLatestIndex(SolrCore core, boolean force) throws IOException {
+    successfulInstall = false;
     replicationStartTime = System.currentTimeMillis();
     try {
       //get the current 'replicateable' index version in the master
@@ -257,9 +258,10 @@ public class SnapPuller {
       }
       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
       long latestGeneration = (Long) response.get(GENERATION);
-      if (latestVersion == 0L) {
+      if (latestVersion == 0L && !force) {
         //there is nothing to be replicated
-        return false;
+        successfulInstall = true;
+        return true;
       }
       IndexCommit commit;
       RefCounted<SolrIndexSearcher> searcherRefCounted = null;
@@ -275,9 +277,11 @@ public class SnapPuller {
           searcherRefCounted.decref();
       }
       if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
-        //master and slave are alsready in sync just return
+        //master and slave are already in sync just return
         LOG.info("Slave in sync with master.");
-        return false;
+        System.out.println("SLAVE IN SYNC:" + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
+        successfulInstall = true;
+        return true;
       }
       LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
       LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
@@ -294,7 +298,7 @@ public class SnapPuller {
       filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
       // if the generateion of master is older than that of the slave , it means they are not compatible to be copied
       // then a new index direcory to be created and all the files need to be copied
-      boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
+      boolean isFullCopyNeeded = commit.getVersion() >= latestVersion || force;
       File tmpIndexDir = createTempindexDir(core);
       if (isIndexStale())
         isFullCopyNeeded = true;
@@ -336,6 +340,7 @@ public class SnapPuller {
         return successfulInstall;
       } catch (ReplicationHandlerException e) {
         LOG.error("User aborted Replication");
+        return false;
       } catch (SolrException e) {
         throw e;
       } catch (Exception e) {
@@ -344,9 +349,9 @@ public class SnapPuller {
         if (deleteTmpIdxDir) delTree(tmpIndexDir);
         else delTree(indexDir);
       }
-      return successfulInstall;
     } finally {
       if (!successfulInstall) {
+        System.out.println("replication failed handler:" + core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName());
         logReplicationTimeAndConfFiles(null, successfulInstall);
       }
       filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Fri Jan 13 15:38:01 2012
@@ -19,12 +19,14 @@ package org.apache.solr.handler.admin;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.RecoveryStrat;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
@@ -590,16 +592,18 @@ public class CoreAdminHandler extends Re
     }
   }
   
-  protected void handlePrepRecoveryAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException {
+  protected void handlePrepRecoveryAction(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException, InterruptedException {
     final SolrParams params = req.getParams();
-
+    
     String cname = params.get(CoreAdminParams.CORE);
     if (cname == null) {
       cname = "";
     }
-
+    
     String nodeName = params.get("nodeName");
-
+    String coreNodeName = params.get("coreNodeName");
+    
     SolrCore core = coreContainer.getCore(cname);
     if (core == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "core not found:" + cname);
@@ -612,51 +616,75 @@ public class CoreAdminHandler extends Re
         // to accept updates
         CloudDescriptor cloudDescriptor = core.getCoreDescriptor()
             .getCloudDescriptor();
-        ZkNodeProps nodeProps = coreContainer
+        CloudState cloudState = coreContainer
             .getZkController()
-            .getCloudState()
-            .getSlice(cloudDescriptor.getCollectionName(),
-                cloudDescriptor.getShardId()).getShards().get(nodeName);
+            .getCloudState();
+        ZkNodeProps nodeProps = 
+            cloudState.getSlice(cloudDescriptor.getCollectionName(),
+                cloudDescriptor.getShardId()).getShards().get(coreNodeName);
         state = nodeProps.get(ZkStateReader.STATE_PROP);
-        if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)) {
+        if (nodeProps != null && state.equals(ZkStateReader.RECOVERING)
+            && cloudState.liveNodesContain(nodeName)) {
           break;
         }
         
         if (retry++ == 30) {
           throw new SolrException(ErrorCode.BAD_REQUEST,
               "I was asked to prep for recovery for " + nodeName
-                  + " but she is not in a recovery state - state: " + state);
+                  + " but she is not live or not in a recovery state - state: " + state);
         }
-
+        
         Thread.sleep(1000);
       }
       
-      if (core != null) {
-        // small safety net for any updates that started with state that
-        // kept it from sending the update to be buffered -
-        // pause for a while to let any outstanding updates finish
-
-        Thread.sleep(2000);
-        
-        UpdateRequestProcessorChain processorChain = core
-            .getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(
-                params, log));
-
-        ModifiableSolrParams reqParams = new ModifiableSolrParams(
-            req.getParams());
-        reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
-        
-        SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
-        UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
-            new SolrQueryResponse());
-        CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+      // small safety net for any updates that started with state that
+      // kept it from sending the update to be buffered -
+      // pause for a while to let any outstanding updates finish
+      
+      Thread.sleep(4000);
+      
+      UpdateRequestProcessorChain processorChain = core
+          .getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(
+              params, log));
+      
+      ModifiableSolrParams reqParams = new ModifiableSolrParams(req.getParams());
+      reqParams.set(DistributedUpdateProcessor.COMMIT_END_POINT, "true");
+      
+      SolrQueryRequest sqr = new LocalSolrQueryRequest(core, reqParams);
+      UpdateRequestProcessor processor = processorChain.createProcessor(sqr,
+          new SolrQueryResponse());
+      CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+      
+      processor.processCommit(cuc);
+      processor.finish();
+      
+      // nocommit
+//      try {
+//        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//        SolrIndexSearcher searcher = searchHolder.get();
+//        try {
+//          System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate "
+//              + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration()  + " data:" + core.getDataDir());
+//        } finally {
+//          searchHolder.decref();
+//        }
+//      } catch (Exception e) {
+//        
+//      }
+      
+      try {
+        RefCounted<SolrIndexSearcher> searchHolder = core.getSearcher();
+        SolrIndexSearcher searcher = searchHolder.get();
+        try {
+          System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " to replicate (2) "
+              + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+        } finally {
+          searchHolder.decref();
+        }
+      } catch (Exception e) {
         
-        processor.processCommit(cuc);
-        processor.finish();
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find core:  "
-            + core);
       }
+      
     } finally {
       if (core != null) {
         core.close();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri Jan 13 15:38:01 2012
@@ -737,7 +737,6 @@ public class UpdateLog implements Plugin
     versionInfo.blockUpdates();
     try {
       if (state != State.BUFFERING) return null;
-      state = State.APPLYING_BUFFERED;
 
       // handle case when no log was even created because no updates
       // were received.
@@ -745,13 +744,14 @@ public class UpdateLog implements Plugin
         state = State.ACTIVE;
         return null;
       }
-
+      tlog.incref();
+      state = State.APPLYING_BUFFERED;
     } finally {
       versionInfo.unblockUpdates();
     }
 
-    tlog.incref();
     if (recoveryExecutor.isShutdown()) {
+      tlog.decref();
       throw new RuntimeException("executor is not running...");
     }
     ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkey.java Fri Jan 13 15:38:01 2012
@@ -45,6 +45,8 @@ import org.mortbay.jetty.servlet.FilterH
  */
 public class ChaosMonkey {
 
+  private static final int CONLOSS_PERCENT = 3; //30%
+  private static final int EXPIRE_PERCENT = 4; //40%
   private static final boolean DONTKILLLEADER = true;
   private Map<String,List<CloudJettyRunner>> shardToJetty;
   
@@ -361,11 +363,11 @@ public class ChaosMonkey {
             
             int rnd = random.nextInt(10);
 
-            if (expireSessions && rnd < 3) {
+            if (expireSessions && rnd < EXPIRE_PERCENT) {
               expireRandomSession();
             } 
             
-            if (causeConnectionLoss && rnd < 2) {
+            if (causeConnectionLoss && rnd < CONLOSS_PERCENT) {
               randomConnectionLoss();
               randomConnectionLoss();
             }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Fri Jan 13 15:38:01 2012
@@ -559,8 +559,9 @@ public class FullSolrCloudTest extends A
               + " live:"
               + cloudState.liveNodesContain(shard.getValue().get(
                   ZkStateReader.NODE_NAME_PROP)));
-          if (shard.getValue().get(ZkStateReader.STATE_PROP)
-              .equals(ZkStateReader.RECOVERING)
+          String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+          if ((state.equals(ZkStateReader.RECOVERING) || state
+              .equals(ZkStateReader.DOWN))
               && cloudState.liveNodesContain(shard.getValue().get(
                   ZkStateReader.NODE_NAME_PROP))) {
             sawLiveRecovering = true;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Fri Jan 13 15:38:01 2012
@@ -92,17 +92,28 @@ public class CoreAdminRequest extends So
   
   public static class PrepRecovery extends CoreAdminRequest {
     protected String nodeName;
-
+    protected String coreNodeName;
 
     public PrepRecovery() {
       action = CoreAdminAction.PREPRECOVERY;
     }
     
-    public void setNodeName(String nodeName) { this.nodeName = nodeName; }
-
-    public String getNodeName() { return nodeName; }
-
-
+    public void setNodeName(String nodeName) {
+      this.nodeName = nodeName;
+    }
+    
+    public String getNodeName() {
+      return nodeName;
+    }
+    
+    public String getCoreNodeName() {
+      return coreNodeName;
+    }
+    
+    public void setCoreNodeName(String coreNodeName) {
+      this.coreNodeName = coreNodeName;
+    }
+    
     @Override
     public SolrParams getParams() {
       if( action == null ) {
@@ -116,9 +127,14 @@ public class CoreAdminRequest extends So
       if (nodeName != null) {
         params.set( "nodeName", nodeName);
       }
+      
+      if (coreNodeName != null) {
+        params.set( "coreNodeName", coreNodeName);
+      }
 
       return params;
     }
+
   }
   
   public static class RequestRecovery extends CoreAdminRequest {

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Jan 13 15:38:01 2012
@@ -62,6 +62,7 @@ public class ZkStateReader {
   public static final String RECOVERING = "recovering";
   public static final String RECOVERY_FAILED = "recovery_failed";
   public static final String ACTIVE = "active";
+  public static final String DOWN = "down";
   
   private volatile CloudState cloudState;
 
@@ -165,50 +166,49 @@ public class ZkStateReader {
     // We need to fetch the current cluster state and the set of live nodes
     
     synchronized (getUpdateLock()) {
-     cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
-
-    
-    
-    log.info("Updating cluster state from ZooKeeper... ");
-
-    zkClient.exists(CLUSTER_STATE, new Watcher() {
-          
-          @Override
-          public void process(WatchedEvent event) {
-            log.info("A cluster state change has occurred");
-            try {
+      cmdExecutor.ensureExists(CLUSTER_STATE, zkClient);
+      
+      log.info("Updating cluster state from ZooKeeper... ");
+      
+      zkClient.exists(CLUSTER_STATE, new Watcher() {
+        
+        @Override
+        public void process(WatchedEvent event) {
+          log.info("A cluster state change has occurred");
+          try {
+            
+            // delayed approach
+            // ZkStateReader.this.updateCloudState(false, false);
+            synchronized (ZkStateReader.this.getUpdateLock()) {
+              // remake watch
+              final Watcher thisWatch = this;
+              byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null,
+                  true);
               
-              // delayed approach
-              // ZkStateReader.this.updateCloudState(false, false);
-              synchronized (ZkStateReader.this.getUpdateLock()) {
-                // remake watch
-                final Watcher thisWatch = this;
-                byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, null, true);
-                
-                CloudState clusterState = CloudState.load(data,
-                    ZkStateReader.this.cloudState.getLiveNodes());
-                // update volatile
-                cloudState = clusterState;
-              }
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                return;
-              }
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.warn("", e);
+              CloudState clusterState = CloudState.load(data,
+                  ZkStateReader.this.cloudState.getLiveNodes());
+              // update volatile
+              cloudState = clusterState;
+            }
+          } catch (KeeperException e) {
+            if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
               return;
-            } 
+            }
+            log.error("", e);
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+            log.warn("", e);
+            return;
           }
-          
-        }, true);
-      }
+        }
+        
+      }, true);
+    }
    
     
     synchronized (ZkStateReader.this.getUpdateLock()) {

Modified: lucene/dev/branches/solrcloud/solr/testlogging.properties
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/testlogging.properties?rev=1231134&r1=1231133&r2=1231134&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/testlogging.properties (original)
+++ lucene/dev/branches/solrcloud/solr/testlogging.properties Fri Jan 13 15:38:01 2012
@@ -1,4 +1,4 @@
 handlers=java.util.logging.ConsoleHandler
-.level=WARNING
+.level=SEVERE
 java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter