You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/03/24 04:05:04 UTC
svn commit: r1580714 - in /lucene/dev/trunk/solr: CHANGES.txt
core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
Author: markrmiller
Date: Mon Mar 24 03:05:04 2014
New Revision: 1580714
URL: http://svn.apache.org/r1580714
Log:
SOLR-5884: When recovery is cancelled, any call to the leader to wait to see the replica in the right state for recovery should be aborted.
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Mar 24 03:05:04 2014
@@ -212,6 +212,8 @@ Optimizations
index has not changed. This reduces overhead in situations such as deletes that
do not modify the index, and/or redundant commits. (hossman)
+* SOLR-5884: When recovery is cancelled, any call to the leader to wait to see
+ the replica in the right state for recovery should be aborted. (Mark Miller)
Other Changes
---------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Mon Mar 24 03:05:04 2014
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -89,6 +91,7 @@ public class RecoveryStrategy extends Th
private int retries;
private boolean recoveringAfterStartup;
private CoreContainer cc;
+ private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
@@ -109,7 +112,12 @@ public class RecoveryStrategy extends Th
@Override
public void close() {
close = true;
- log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName );
+ try {
+ prevSendPreRecoveryHttpUriRequest.abort();
+ } catch (NullPointerException e) {
+ // okay
+ }
+ log.warn("Stopping recovery for zkNodeName=" + coreZkNodeName + "core=" + coreName);
}
@@ -206,27 +214,6 @@ public class RecoveryStrategy extends Th
}
}
- private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
- throws SolrServerException, IOException {
- HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
- try {
- server.setConnectionTimeout(30000);
- WaitForState prepCmd = new WaitForState();
- prepCmd.setCoreName(leaderCoreName);
- prepCmd.setNodeName(zkController.getNodeName());
- prepCmd.setCoreNodeName(coreZkNodeName);
- prepCmd.setState(ZkStateReader.RECOVERING);
- prepCmd.setCheckLive(true);
- prepCmd.setOnlyIfLeader(true);
- if (!Slice.CONSTRUCTION.equals(slice.getState()) && !Slice.RECOVERY.equals(slice.getState())) {
- prepCmd.setOnlyIfLeaderActive(true);
- }
- server.request(prepCmd);
- } finally {
- server.shutdown();
- }
- }
-
@Override
public void run() {
SolrCore core = cc.getCore(coreName);
@@ -345,8 +332,8 @@ public class RecoveryStrategy extends Th
ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
- String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+ final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+ final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
@@ -367,9 +354,26 @@ public class RecoveryStrategy extends Th
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
- Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ try {
+ prevSendPreRecoveryHttpUriRequest.abort();
+ } catch (NullPointerException e) {
+ // okay
+ }
+
+ if (isClosed()) {
+ log.info("Recovery was cancelled");
+ break;
+ }
+
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+ if (isClosed()) {
+ log.info("Recovery was cancelled");
+ break;
+ }
+
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished
@@ -426,6 +430,11 @@ public class RecoveryStrategy extends Th
log.info("PeerSync Recovery was not successful - trying replication. core=" + coreName);
}
+ if (isClosed()) {
+ log.info("Recovery was cancelled");
+ break;
+ }
+
log.info("Starting Replication Recovery. core=" + coreName);
log.info("Begin buffering updates. core=" + coreName);
@@ -436,8 +445,18 @@ public class RecoveryStrategy extends Th
replicate(zkController.getNodeName(), core, leaderprops);
+ if (isClosed()) {
+ log.info("Recovery was cancelled");
+ break;
+ }
+
replay(core);
replayed = true;
+
+ if (isClosed()) {
+ log.info("Recovery was cancelled");
+ break;
+ }
log.info("Replication Recovery was successful - registering as Active. core=" + coreName);
// if there are pending recovery requests, don't advert as active
@@ -571,5 +590,28 @@ public class RecoveryStrategy extends Th
public boolean isClosed() {
return close;
}
+
+ private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
+ throws SolrServerException, IOException, InterruptedException, ExecutionException {
+ HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
+ try {
+ server.setConnectionTimeout(30000);
+ WaitForState prepCmd = new WaitForState();
+ prepCmd.setCoreName(leaderCoreName);
+ prepCmd.setNodeName(zkController.getNodeName());
+ prepCmd.setCoreNodeName(coreZkNodeName);
+ prepCmd.setState(ZkStateReader.RECOVERING);
+ prepCmd.setCheckLive(true);
+ prepCmd.setOnlyIfLeader(true);
+ if (!Slice.CONSTRUCTION.equals(slice.getState()) && !Slice.RECOVERY.equals(slice.getState())) {
+ prepCmd.setOnlyIfLeaderActive(true);
+ }
+ HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd);
+ prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
+ mrr.future.get();
+ } finally {
+ server.shutdown();
+ }
+ }
}
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1580714&r1=1580713&r2=1580714&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Mon Mar 24 03:05:04 2014
@@ -28,6 +28,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
@@ -40,6 +44,7 @@ import org.apache.http.client.entity.Url
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.entity.ContentType;
@@ -203,6 +208,49 @@ public class HttpSolrServer extends Solr
return executeMethod(createMethod(request),processor);
}
+ /**
+ * @lucene.experimental
+ */
+ public static class HttpUriRequestResponse {
+ public HttpUriRequest httpUriRequest;
+ public Future<NamedList<Object>> future;
+ }
+
+ /**
+ * @lucene.experimental
+ */
+ public HttpUriRequestResponse httpUriRequest(final SolrRequest request)
+ throws SolrServerException, IOException {
+ ResponseParser responseParser = request.getResponseParser();
+ if (responseParser == null) {
+ responseParser = parser;
+ }
+ return httpUriRequest(request, responseParser);
+ }
+
+ /**
+ * @lucene.experimental
+ */
+ public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
+ HttpUriRequestResponse mrr = new HttpUriRequestResponse();
+ final HttpRequestBase method = createMethod(request);
+ ExecutorService pool = Executors.newFixedThreadPool(1);
+ try {
+ mrr.future = pool.submit(new Callable<NamedList<Object>>(){
+
+ @Override
+ public NamedList<Object> call() throws Exception {
+ return executeMethod(method, processor);
+ }});
+
+ } finally {
+ pool.shutdownNow();
+ }
+ assert method != null;
+ mrr.httpUriRequest = method;
+ return mrr;
+ }
+
protected HttpRequestBase createMethod(final SolrRequest request) throws IOException, SolrServerException {
HttpRequestBase method = null;
InputStream is = null;