You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/03/24 04:05:04 UTC

svn commit: r1580714 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/RecoveryStrategy.java solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java

Author: markrmiller
Date: Mon Mar 24 03:05:04 2014
New Revision: 1580714

URL: http://svn.apache.org/r1580714
Log:
SOLR-5884: When recovery is cancelled, any call to the leader to wait to see the replica in the right state for recovery should be aborted.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Mar 24 03:05:04 2014
@@ -212,6 +212,8 @@ Optimizations
   index has not changed.  This reduces overhead in situations such as deletes that 
   do not modify the index, and/or redundant commits. (hossman)
   
+* SOLR-5884: When recovery is cancelled, any call to the leader to wait to see
+  the replica in the right state for recovery should be aborted. (Mark Miller)
 
 Other Changes
 ---------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Mar 24 03:05:04 2014
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -89,6 +91,7 @@ public class RecoveryStrategy extends Th
   private int retries;
   private boolean recoveringAfterStartup;
   private CoreContainer cc;
+  private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
   
   public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
     this.cc = cc;
@@ -109,7 +112,12 @@ public class RecoveryStrategy extends Th
   @Override
   public void close() {
     close = true;
-    log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName );
+    try {
+      prevSendPreRecoveryHttpUriRequest.abort();
+    } catch (NullPointerException e) {
+      // okay
+    }
+    log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName);
   }
 
   
@@ -206,27 +214,6 @@ public class RecoveryStrategy extends Th
     }
   }
 
-  private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
-      throws SolrServerException, IOException {
-    HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
-    try {
-      server.setConnectionTimeout(30000);
-      WaitForState prepCmd = new WaitForState();
-      prepCmd.setCoreName(leaderCoreName);
-      prepCmd.setNodeName(zkController.getNodeName());
-      prepCmd.setCoreNodeName(coreZkNodeName);
-      prepCmd.setState(ZkStateReader.RECOVERING);
-      prepCmd.setCheckLive(true);
-      prepCmd.setOnlyIfLeader(true);
-      if (!Slice.CONSTRUCTION.equals(slice.getState()) && !Slice.RECOVERY.equals(slice.getState())) {
-        prepCmd.setOnlyIfLeaderActive(true);
-      }
-      server.request(prepCmd);
-    } finally {
-      server.shutdown();
-    }
-  }
-
   @Override
   public void run() {
     SolrCore core = cc.getCore(coreName);
@@ -345,8 +332,8 @@ public class RecoveryStrategy extends Th
         ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
             cloudDesc.getCollectionName(), cloudDesc.getShardId());
       
-        String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
 
         String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
 
@@ -367,9 +354,26 @@ public class RecoveryStrategy extends Th
         zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
         
         
-        Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+        final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+        try {
+          prevSendPreRecoveryHttpUriRequest.abort();
+        } catch (NullPointerException e) {
+          // okay
+        }
+        
+        if (isClosed()) {
+          log.info("Recovery was cancelled");
+          break;
+        }
+
         sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
         
+        if (isClosed()) {
+          log.info("Recovery was cancelled");
+          break;
+        }
+        
         // we wait a bit so that any updates on the leader
         // that started before they saw recovering state 
         // are sure to have finished
@@ -426,6 +430,11 @@ public class RecoveryStrategy extends Th
           log.info("PeerSync Recovery was not successful - trying replication. core=" + coreName);
         }
 
+        if (isClosed()) {
+          log.info("Recovery was cancelled");
+          break;
+        }
+        
         log.info("Starting Replication Recovery. core=" + coreName);
         
         log.info("Begin buffering updates. core=" + coreName);
@@ -436,8 +445,18 @@ public class RecoveryStrategy extends Th
 
           replicate(zkController.getNodeName(), core, leaderprops);
 
+          if (isClosed()) {
+            log.info("Recovery was cancelled");
+            break;
+          }
+          
           replay(core);
           replayed = true;
+          
+          if (isClosed()) {
+            log.info("Recovery was cancelled");
+            break;
+          }
 
           log.info("Replication Recovery was successful - registering as Active. core=" + coreName);
           // if there are pending recovery requests, don't advert as active
@@ -571,5 +590,28 @@ public class RecoveryStrategy extends Th
   public boolean isClosed() {
     return close;
   }
+  
+  private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
+      throws SolrServerException, IOException, InterruptedException, ExecutionException {
+    HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
+    try {
+      server.setConnectionTimeout(30000);
+      WaitForState prepCmd = new WaitForState();
+      prepCmd.setCoreName(leaderCoreName);
+      prepCmd.setNodeName(zkController.getNodeName());
+      prepCmd.setCoreNodeName(coreZkNodeName);
+      prepCmd.setState(ZkStateReader.RECOVERING);
+      prepCmd.setCheckLive(true);
+      prepCmd.setOnlyIfLeader(true);
+      if (!Slice.CONSTRUCTION.equals(slice.getState()) && !Slice.RECOVERY.equals(slice.getState())) {
+        prepCmd.setOnlyIfLeaderActive(true);
+      }
+      HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd);
+      prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+      mrr.future.get();
+    } finally {
+      server.shutdown();
+    }
+  }
 
 }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Mon Mar 24 03:05:04 2014
@@ -28,6 +28,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.Header;
@@ -40,6 +44,7 @@ import org.apache.http.client.entity.Url
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.params.ClientPNames;
 import org.apache.http.conn.ClientConnectionManager;
 import org.apache.http.entity.ContentType;
@@ -203,6 +208,49 @@ public class HttpSolrServer extends Solr
     return executeMethod(createMethod(request),processor);
   }
   
+  /**
+   * @lucene.experimental
+   */
+  public static class HttpUriRequestResponse {
+    public HttpUriRequest httpUriRequest;
+    public Future<NamedList<Object>> future;
+  }
+  
+  /**
+   * @lucene.experimental
+   */
+  public HttpUriRequestResponse httpUriRequest(final SolrRequest request)
+      throws SolrServerException, IOException {
+    ResponseParser responseParser = request.getResponseParser();
+    if (responseParser == null) {
+      responseParser = parser;
+    }
+    return httpUriRequest(request, responseParser);
+  }
+  
+  /**
+   * @lucene.experimental
+   */
+  public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
+    HttpUriRequestResponse mrr = new HttpUriRequestResponse();
+    final HttpRequestBase method = createMethod(request);
+    ExecutorService pool = Executors.newFixedThreadPool(1);
+    try {
+      mrr.future = pool.submit(new Callable<NamedList<Object>>(){
+
+        @Override
+        public NamedList<Object> call() throws Exception {
+          return executeMethod(method, processor);
+        }});
+ 
+    } finally {
+      pool.shutdownNow();
+    }
+    assert method != null;
+    mrr.httpUriRequest = method;
+    return mrr;
+  }
+  
   protected HttpRequestBase createMethod(final SolrRequest request) throws IOException, SolrServerException {
     HttpRequestBase method = null;
     InputStream is = null;