You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/07/28 20:46:23 UTC
svn commit: r1614120 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/CHANGES.txt solr/core/
solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
Author: markrmiller
Date: Mon Jul 28 18:46:23 2014
New Revision: 1614120
URL: http://svn.apache.org/r1614120
Log:
SOLR-6264: Distributed commit and optimize are executed serially across all replicas.
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1614120&r1=1614119&r2=1614120&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Jul 28 18:46:23 2014
@@ -155,6 +155,8 @@ Bug Fixes
* SOLR-5847: Fixed data import abort button in admin UI. (ehatcher)
+* SOLR-6264: Distributed commit and optimize are executed serially across all
+ replicas. (Mark Miller, Timothy Potter)
Optimizations
---------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1614120&r1=1614119&r2=1614120&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Jul 28 18:46:23 2014
@@ -21,7 +21,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
import org.apache.solr.client.solrj.SolrServer;
@@ -51,27 +59,34 @@ public class SolrCmdDistributor {
private int retryPause = 500;
private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
- private List<Error> allErrors = new ArrayList<>();
- private List<Error> errors = new ArrayList<>();
+ private final List<Error> allErrors = new ArrayList<>();
+ private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
+ private final ExecutorService updateExecutor;
+
+ private final CompletionService<Object> completionService;
+ private final Set<Future<Object>> pending = new HashSet<>();
public static interface AbortCheck {
public boolean abortCheck();
}
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
- servers = new StreamingSolrServers(updateShardHandler);
+ this.servers = new StreamingSolrServers(updateShardHandler);
+ this.updateExecutor = updateShardHandler.getUpdateExecutor();
+ this.completionService = new ExecutorCompletionService<>(updateExecutor);
}
public SolrCmdDistributor(StreamingSolrServers servers, int maxRetriesOnForward, int retryPause) {
this.servers = servers;
this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause;
+ this.updateExecutor = servers.getUpdateExecutor();
+ completionService = new ExecutorCompletionService<>(updateExecutor);
}
public void finish() {
try {
- servers.blockUntilFinished();
- doRetriesIfNeeded();
+ blockAndDoRetries();
} finally {
servers.shutdown();
}
@@ -144,12 +159,11 @@ public class SolrCmdDistributor {
servers.clearErrors();
this.errors.clear();
for (Error err : resubmitList) {
- submit(err.req);
+ submit(err.req, false);
}
if (resubmitList.size() > 0) {
- servers.blockUntilFinished();
- doRetriesIfNeeded();
+ blockAndDoRetries();
}
}
@@ -168,7 +182,7 @@ public class SolrCmdDistributor {
uReq.deleteByQuery(cmd.query);
}
- submit(new Req(cmd.toString(), node, uReq, sync));
+ submit(new Req(cmd.toString(), node, uReq, sync), false);
}
}
@@ -186,7 +200,7 @@ public class SolrCmdDistributor {
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- submit(new Req(cmd.toString(), node, uReq, synchronous, rrt));
+ submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
}
}
@@ -195,8 +209,7 @@ public class SolrCmdDistributor {
ModifiableSolrParams params) throws IOException {
// we need to do any retries before commit...
- servers.blockUntilFinished();
- doRetriesIfNeeded();
+ blockAndDoRetries();
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
@@ -206,10 +219,29 @@ public class SolrCmdDistributor {
log.debug("Distrib commit to: {} params: {}", nodes, params);
for (Node node : nodes) {
- submit(new Req(cmd.toString(), node, uReq, false));
+ submit(new Req(cmd.toString(), node, uReq, false), true);
}
}
+
+ private void blockAndDoRetries() {
+ servers.blockUntilFinished();
+
+ // wait for any async commits to complete
+ while (pending != null && pending.size() > 0) {
+ Future<Object> future = null;
+ try {
+ future = completionService.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("blockAndDoRetries interrupted", e);
+ }
+ if (future == null) break;
+ pending.remove(future);
+ }
+ doRetriesIfNeeded();
+
+ }
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
if (cmd == null) return;
@@ -217,10 +249,9 @@ public class SolrCmdDistributor {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
}
- private void submit(Req req) {
+ private void submit(final Req req, boolean isCommit) {
if (req.synchronous) {
- servers.blockUntilFinished();
- doRetriesIfNeeded();
+ blockAndDoRetries();
HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
servers.getHttpClient());
@@ -234,12 +265,32 @@ public class SolrCmdDistributor {
return;
}
+
if (log.isDebugEnabled()) {
log.debug("sending update to "
+ req.node.getUrl() + " retry:"
+ req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
}
+ if (isCommit) {
+ // a commit using ConncurrentUpdateSolrServer is not async,
+ // so we make it async to prevent commits from happening
+ // serially across multiple nodes
+ pending.add(completionService.submit(new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ doRequest(req);
+ return null;
+ }
+
+ }));
+ } else {
+ doRequest(req);
+ }
+ }
+
+ private void doRequest(final Req req) {
try {
SolrServer solrServer = servers.getSolrServer(req);
solrServer.request(req.uReq);
@@ -281,6 +332,7 @@ public class SolrCmdDistributor {
sb.append("; node=").append(String.valueOf(node));
return sb.toString();
}
+
public void trackRequestResult(HttpResponse resp, boolean success) {
if (rfTracker != null) {
Integer rf = null;
@@ -496,4 +548,3 @@ public class SolrCmdDistributor {
}
}
-
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java?rev=1614120&r1=1614119&r2=1614120&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java Mon Jul 28 18:46:23 2014
@@ -123,4 +123,8 @@ public class StreamingSolrServers {
public HttpClient getHttpClient() {
return httpClient;
}
+
+ public ExecutorService getUpdateExecutor() {
+ return updateExecutor;
+ }
}