You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/02/14 20:15:54 UTC

svn commit: r1244177 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/cloud/RecoveryStrategy.java test/org/apache/solr/cloud/FullSolrCloudTest.java

Author: markrmiller
Date: Tue Feb 14 19:15:54 2012
New Revision: 1244177

URL: http://svn.apache.org/viewvc?rev=1244177&view=rev
Log:
SOLR-3126: We should try to do a quick sync on std start up recovery before trying to do a full blown replication.

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1244177&r1=1244176&r2=1244177&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Tue Feb 14 19:15:54 2012
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
  */
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
@@ -35,15 +36,19 @@ import org.apache.solr.core.CoreDescript
 import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RecoveryStrategy extends Thread {
-  private static final int MAX_RETRIES = 100;
-  private static final int INTERRUPTED = 101;
+  private static final int MAX_RETRIES = 500;
+  private static final int INTERRUPTED = MAX_RETRIES + 1;
   private static final int START_TIMEOUT = 100;
   
   private static final String REPLICATION_HANDLER = "/replication";
@@ -86,7 +91,7 @@ public class RecoveryStrategy extends Th
     close = true;
   }
   
-  private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops, String baseUrl)
       throws SolrServerException, IOException {
     // start buffer updates to tran log
     // and do recovery - either replay via realtime get (eventually)
@@ -97,7 +102,7 @@ public class RecoveryStrategy extends Th
     String leaderUrl = leaderCNodeProps.getCoreUrl();
     String leaderCoreName = leaderCNodeProps.getCoreName();
     
-    log.info("Attempt to replicate from " + leaderUrl);
+    log.info("Attempting to replicate from " + leaderUrl);
     
     // if we are the leader, either we are trying to recover faster
     // then our ephemeral timed out or we are the only node
@@ -109,7 +114,7 @@ public class RecoveryStrategy extends Th
       PrepRecovery prepCmd = new PrepRecovery();
       prepCmd.setCoreName(leaderCoreName);
       prepCmd.setNodeName(nodeName);
-      prepCmd.setCoreNodeName(shardZkNodeName);
+      prepCmd.setCoreNodeName(coreZkNodeName);
       
       server.request(prepCmd);
       server.shutdown();
@@ -158,46 +163,72 @@ public class RecoveryStrategy extends Th
     boolean succesfulRecovery = false;
     
     while (!succesfulRecovery && !close && !isInterrupted()) {
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      if (ulog == null) return;
-      
-      ulog.bufferUpdates();
-      replayed = false;
-      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
       try {
+        // first thing we just try to sync
         zkController.publish(core, ZkStateReader.RECOVERING);
+        CloudDescriptor cloudDesc = core.getCoreDescriptor()
+            .getCloudDescriptor();
+        ZkNodeProps leaderprops = null;
         
-        ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
+        leaderprops = zkStateReader.getLeaderProps(
             cloudDesc.getCollectionName(), cloudDesc.getShardId());
         
-        // System.out.println("recover " + shardZkNodeName + " against " +
-        // leaderprops);
-        replicate(zkController.getNodeName(), core, coreZkNodeName,
-            leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
-        
-        replay(ulog);
-        replayed = true;
-        
-        // if there are pending recovery requests, don't advert as active
-        zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName,
-            coreName);
-        
-        succesfulRecovery = true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.warn("Recovery was interrupted", e);
-        retries = INTERRUPTED;
-      } catch (Throwable t) {
-        SolrException.log(log, "Error while trying to recover", t);
-      } finally {
-        if (!replayed) {
-          try {
-            ulog.dropBufferedUpdates();
-          } catch (Throwable t) {
-            SolrException.log(log, "", t);
+        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP));
+        
+        log.info("Attempting to PeerSync from " + leaderUrl);
+        PeerSync peerSync = new PeerSync(core,
+            Collections.singletonList(leaderUrl), 100);
+        boolean syncSuccess = peerSync.sync();
+        if (syncSuccess) {
+          SolrQueryRequest req = new LocalSolrQueryRequest(core,
+              new ModifiableSolrParams());
+          core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+          log.info("Sync Recovery was succesful - registering as Active");
+          // sync success - register as active and return
+          zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
+              coreZkNodeName, coreName);
+          return;
+        }
+        log.info("Sync Recovery was not successful - trying replication");
+        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+        if (ulog == null) return;
+        
+        ulog.bufferUpdates();
+        replayed = false;
+        
+        try {
+          
+          replicate(zkController.getNodeName(), core,
+              leaderprops, leaderUrl);
+          
+          replay(ulog);
+          replayed = true;
+          
+          log.info("Recovery was succesful - registering as Active");
+          // if there are pending recovery requests, don't advert as active
+          zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
+              coreZkNodeName, coreName);
+          
+          succesfulRecovery = true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn("Recovery was interrupted", e);
+          retries = INTERRUPTED;
+        } catch (Throwable t) {
+          SolrException.log(log, "Error while trying to recover", t);
+        } finally {
+          if (!replayed) {
+            try {
+              ulog.dropBufferedUpdates();
+            } catch (Throwable t) {
+              SolrException.log(log, "", t);
+            }
           }
+          
         }
         
+      } catch (Throwable t) {
+        SolrException.log(log, "Error while trying to recover", t);
       }
       
       if (!succesfulRecovery) {
@@ -205,14 +236,14 @@ public class RecoveryStrategy extends Th
         // TODO: we don't want to retry for some problems?
         // Or do a fall off retry...
         try {
-
+          
           SolrException.log(log, "Recovery failed - trying again...");
           retries++;
           if (retries >= MAX_RETRIES) {
             if (retries == INTERRUPTED) {
-
+              
             } else {
-              // TODO: for now, give up after 10 tries - should we do more?
+              // TODO: for now, give up after X tries - should we do more?
               recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
                   core.getCoreDescriptor());
             }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java?rev=1244177&r1=1244176&r2=1244177&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudTest.java Tue Feb 14 19:15:54 2012
@@ -698,8 +698,14 @@ public class FullSolrCloudTest extends A
   
   private void brindDownShardIndexSomeDocsAndRecover() throws Exception,
       SolrServerException, IOException, InterruptedException {
+    SolrQuery query = new SolrQuery("*:*");
+    query.set("distrib", false);
     
     commit();
+    
+    long deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
+    System.out.println("dsc:" + deadShardCount);
+    
     query("q", "*:*", "sort", "n_tl1 desc");
     
     // kill a shard
@@ -715,7 +721,6 @@ public class FullSolrCloudTest extends A
 	
     // ensure shard is dead
     try {
-      // TODO: ignore fail
       index_specific(shardToClient.get(SHARD2).get(0), id, 999, i1, 107, t1,
           "specific doc!");
       fail("This server should be down and this update should have failed");
@@ -743,6 +748,7 @@ public class FullSolrCloudTest extends A
       Thread.sleep(1000);
     }
 	
+    long numFound1 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
     
     index_specific(shardToClient.get(SHARD2).get(1), id, 1000, i1, 108, t1,
         "specific doc!");
@@ -755,8 +761,10 @@ public class FullSolrCloudTest extends A
     
     // try adding a doc with CloudSolrServer
     cloudClient.setDefaultCollection(DEFAULT_COLLECTION);
-    SolrQuery query = new SolrQuery("*:*");
-    long numFound1 = cloudClient.query(query).getResults().getNumFound();
+
+    long numFound2 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+    
+    assertEquals(numFound1 + 1, numFound2);
     
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField("id", 1001);
@@ -772,10 +780,10 @@ public class FullSolrCloudTest extends A
     
     query("q", "*:*", "sort", "n_tl1 desc");
     
-    long numFound2 = cloudClient.query(query).getResults().getNumFound();
+    long numFound3 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
     
     // lets just check that the one doc since last commit made it in...
-    assertEquals(numFound1 + 1, numFound2);
+    assertEquals(numFound2 + 1, numFound3);
     
     // test debugging
     testDebugQueries();
@@ -786,7 +794,9 @@ public class FullSolrCloudTest extends A
       
       for (SolrServer client : clients) {
         try {
-          System.out.println(client.query(new SolrQuery("*:*")).getResults()
+          SolrQuery q = new SolrQuery("*:*");
+          q.set("distrib", false);
+          System.out.println(client.query(q).getResults()
               .getNumFound());
         } catch (Exception e) {
           
@@ -798,21 +808,41 @@ public class FullSolrCloudTest extends A
     // query("q","matchesnothing","fl","*,score", "debugQuery", "true");
     
     // this should trigger a recovery phase on deadShard
-    
     deadShard.start(true);
     
-    // make sure we have published we are recoverying
+    // make sure we have published we are recovering
     Thread.sleep(1500);
     
     waitForRecoveriesToFinish(false);
     
-    List<SolrServer> s2c = shardToClient.get(SHARD2);
-    
+    deadShardCount = shardToClient.get(SHARD2).get(0).query(query).getResults().getNumFound();
     // if we properly recovered, we should now have the couple missing docs that
     // came in while shard was down
-    assertEquals(s2c.get(0).query(new SolrQuery("*:*")).getResults()
-        .getNumFound(), s2c.get(1).query(new SolrQuery("*:*")).getResults()
-        .getNumFound());
+    checkShardConsistency(true, false);
+    
+    
+    // recover over 100 docs so we do more than just peer sync (replicate recovery)
+    deadShard = chaosMonkey.stopShard(SHARD2, 0);
+    
+    for (int i = 0; i < 226; i++) {
+      doc = new SolrInputDocument();
+      doc.addField("id", 2000 + i);
+      controlClient.add(doc);
+      ureq = new UpdateRequest();
+      ureq.add(doc);
+      // ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
+      ureq.process(cloudClient);
+    }
+    commit();
+    
+    deadShard.start(true);
+    
+    // make sure we have published we are recovering
+    Thread.sleep(1500);
+    
+    waitForRecoveriesToFinish(false);
+    
+    checkShardConsistency(true, false);
   }
   
   private void testDebugQueries() throws Exception {