You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/12 05:23:47 UTC

svn commit: r1230393 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/cloud/ java/org/apache/solr/handler/admin/ java/org/apache/solr/update/ test/org/apache/solr/cloud/

Author: markrmiller
Date: Thu Jan 12 04:23:46 2012
New Revision: 1230393

URL: http://svn.apache.org/viewvc?rev=1230393&view=rev
Log:
don't advertise as active if multiple recoveries have lined up and other little tweaks

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySolrCloudTest.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1230393&r1=1230392&r2=1230393&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Thu Jan 12 04:23:46 2012
@@ -102,8 +102,13 @@ public  class LeaderElector {
           break;
         }
       }
+      int index = i - 2;
+      if (index < 0) {
+        log.warn("Our node is no longer in line to be leader");
+        return;
+      }
       try {
-        zkClient.getData(holdElectionPath + "/" + seqs.get(i - 2),
+        zkClient.getData(holdElectionPath + "/" + seqs.get(index),
             new Watcher() {
               
               @Override

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java?rev=1230393&r1=1230392&r2=1230393&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/RecoveryStrat.java Thu Jan 12 04:23:46 2012
@@ -20,7 +20,6 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
@@ -52,10 +51,6 @@ public class RecoveryStrat {
 
   private volatile boolean close = false;
   
-  private final AtomicInteger recoveryAttempts = new AtomicInteger();
-  private final AtomicInteger recoverySuccesses = new AtomicInteger();
-  
-  
   // for now, just for tests
   public interface RecoveryListener {
     public void startRecovery();
@@ -70,10 +65,7 @@ public class RecoveryStrat {
   
   // TODO: we want to be pretty noisy if we don't properly recover?
   public void recover(final SolrCore core) {
-    
-    log.info("Start recovery process");
-    if (recoveryListener != null) recoveryListener.startRecovery();
-    
+   
     final ZkController zkController = core.getCoreDescriptor()
         .getCoreContainer().getZkController();
     final ZkStateReader zkStateReader = zkController.getZkStateReader();
@@ -82,11 +74,22 @@ public class RecoveryStrat {
         + core.getName();
     final CloudDescriptor cloudDesc = core.getCoreDescriptor()
         .getCloudDescriptor();
+ 
+    core.getUpdateHandler().getSolrCoreState().recoveryRequests.incrementAndGet();
+    try {
+      log.info("Start recovery process");
+      if (recoveryListener != null) recoveryListener.startRecovery();
+
+      zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName,
+          core.getName());
+    } catch (Exception e) {
+      log.error("", e);
+      core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
+      recoveryFailed(core, zkController, baseUrl, shardZkNodeName,
+          cloudDesc);
+      return;
+    }
     
-    zkController.publishAsRecoverying(baseUrl, cloudDesc, shardZkNodeName, core.getName());
-    
-    // TODO: we should really track if a recovery is already attempting and if it is, interrupt it
-    // and start again...
     Thread thread = new Thread() {
       {
         setDaemon(true);
@@ -99,14 +102,11 @@ public class RecoveryStrat {
           UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
           if (ulog == null) return;
 
-          // TODO: consider any races issues here
-          // was checking state first, but there is a race..
           ulog.bufferUpdates();  
           boolean replayed = false;
           boolean succesfulRecovery = false;
           int retries = 0;
           while (!succesfulRecovery && !close) {
-            recoveryAttempts.incrementAndGet();
             try {
               ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
                   cloudDesc.getCollectionName(), cloudDesc.getShardId());
@@ -116,13 +116,16 @@ public class RecoveryStrat {
               replay(core);
               replayed = true;
               
-              zkController
-                  .publishAsActive(baseUrl, cloudDesc, shardZkNodeName, core.getName());
+              // if there are pending recovery requests, don't advert as active
+              if (core.getUpdateHandler().getSolrCoreState().recoveryRequests
+                  .get() == 1) {
+                zkController.publishAsActive(baseUrl, cloudDesc,
+                    shardZkNodeName, core.getName());
+              }
               
               if (recoveryListener != null) recoveryListener.finishedRecovery();
 
               succesfulRecovery = true;
-              recoverySuccesses.incrementAndGet();
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               log.error("Recovery was interrupted", e);
@@ -138,6 +141,9 @@ public class RecoveryStrat {
                   log.warn("", e);
                 }
               }
+              if (succesfulRecovery) {
+                core.getUpdateHandler().getSolrCoreState().recoveryRequests.decrementAndGet();
+              }
             }
             
             if (!succesfulRecovery) {
@@ -148,10 +154,8 @@ public class RecoveryStrat {
               retries++;
               if (retries >= MAX_RETRIES) {
                 // TODO: for now, give up after 10 tries - should we do more?
-                log.error("Recovery failed - I give up.");
-                zkController.publishAsRecoveryFailed(baseUrl, cloudDesc,
-                    shardZkNodeName, core.getName());
-                close = true;
+                recoveryFailed(core, zkController, baseUrl, shardZkNodeName,
+                    cloudDesc);
               }
               
               try {
@@ -183,6 +187,15 @@ public class RecoveryStrat {
     thread.start();
   }
   
+  private void recoveryFailed(final SolrCore core,
+      final ZkController zkController, final String baseUrl,
+      final String shardZkNodeName, final CloudDescriptor cloudDesc) {
+    log.error("Recovery failed - I give up.");
+    zkController.publishAsRecoveryFailed(baseUrl, cloudDesc,
+        shardZkNodeName, core.getName());
+    close = true;
+  }
+  
   private void replicate(SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
       throws SolrServerException, IOException {
     // start buffer updates to tran log
@@ -217,7 +230,8 @@ public class RecoveryStrat {
       ReplicationHandler replicationHandler = (ReplicationHandler) handler;
       
       if (replicationHandler == null) {
-        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Skipping recovery, no /replication handler found");
+        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+            "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
       }
       
       ModifiableSolrParams solrParams = new ModifiableSolrParams();
@@ -236,12 +250,4 @@ public class RecoveryStrat {
   public void setRecoveryListener(RecoveryListener recoveryListener) {
     this.recoveryListener = recoveryListener;
   }
-
-  public AtomicInteger getRecoveryAttempts() {
-    return recoveryAttempts;
-  }
-
-  public AtomicInteger getRecoverySuccesses() {
-    return recoverySuccesses;
-  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1230393&r1=1230392&r2=1230393&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu Jan 12 04:23:46 2012
@@ -622,10 +622,10 @@ public class CoreAdminHandler extends Re
           break;
         }
         
-        if (retry++ == 10) {
+        if (retry++ == 30) {
           throw new SolrException(ErrorCode.BAD_REQUEST,
               "I was asked to prep for recovery for " + nodeName
-                  + " but she is not in a recovery state");
+                  + " but she is not in a recovery state - state: " + state);
         }
 
         Thread.sleep(1000);
@@ -636,7 +636,7 @@ public class CoreAdminHandler extends Re
         // kept it from sending the update to be buffered -
         // pause for a while to let any outstanding updates finish
 
-        Thread.sleep(1500);
+        Thread.sleep(2000);
         
         UpdateRequestProcessorChain processorChain = core
             .getUpdateProcessingChain(SolrPluginUtils.resolveUpdateChainParam(

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1230393&r1=1230392&r2=1230393&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Thu Jan 12 04:23:46 2012
@@ -18,6 +18,7 @@ package org.apache.solr.update;
  */
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.index.IndexWriter;
 import org.apache.solr.core.DirectoryFactory;
@@ -33,6 +34,8 @@ public abstract class SolrCoreState {
   // need a per core lock over reloads...
   private final Object recoveryLock = new Object();
   
+  public final AtomicInteger recoveryRequests = new AtomicInteger();
+  
   /**
    * Force the creation of a new IndexWriter using the settings from the given
    * SolrCore.

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySolrCloudTest.java?rev=1230393&r1=1230392&r2=1230393&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySolrCloudTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySolrCloudTest.java Thu Jan 12 04:23:46 2012
@@ -82,6 +82,10 @@ public class ChaosMonkeySolrCloudTest ex
     handle.put("QTime", SKIPVAL);
     handle.put("timestamp", SKIPVAL);
     
+    // we cannot do delete by query
+    // as it's not supported for recovery
+    //del("*:*");
+    
     List<StopableIndexingThread> threads = new ArrayList<StopableIndexingThread>();
     int threadCount = atLeast(2);
     for (int i = 0; i < threadCount; i++) {