You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2014/07/28 20:45:24 UTC

svn commit: r1614118 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/update/SolrCmdDistributor.java core/src/java/org/apache/solr/update/StreamingSolrServers.java

Author: markrmiller
Date: Mon Jul 28 18:45:23 2014
New Revision: 1614118

URL: http://svn.apache.org/r1614118
Log:
SOLR-6264: Distributed commit and optimize are executed serially across all replicas.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1614118&r1=1614117&r2=1614118&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Jul 28 18:45:23 2014
@@ -224,6 +224,8 @@ Bug Fixes
 
 * SOLR-5847: Fixed data import abort button in admin UI. (ehatcher)
 
+* SOLR-6264: Distributed commit and optimize are executed serially across all 
+  replicas. (Mark Miller, Timothy Potter)
 
 Optimizations
 ---------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1614118&r1=1614117&r2=1614118&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Jul 28 18:45:23 2014
@@ -21,7 +21,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.ConnectException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.http.HttpResponse;
 import org.apache.solr.client.solrj.SolrServer;
@@ -51,27 +59,34 @@ public class SolrCmdDistributor {
   private int retryPause = 500;
   private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
   
-  private List<Error> allErrors = new ArrayList<>();
-  private List<Error> errors = new ArrayList<>();
+  private final List<Error> allErrors = new ArrayList<>();
+  private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
+  private final ExecutorService updateExecutor;
+  
+  private final CompletionService<Object> completionService;
+  private final Set<Future<Object>> pending = new HashSet<>();
   
   public static interface AbortCheck {
     public boolean abortCheck();
   }
   
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
-    servers = new StreamingSolrServers(updateShardHandler);
+    this.servers = new StreamingSolrServers(updateShardHandler);
+    this.updateExecutor = updateShardHandler.getUpdateExecutor();
+    this.completionService = new ExecutorCompletionService<>(updateExecutor);
   }
   
   public SolrCmdDistributor(StreamingSolrServers servers, int maxRetriesOnForward, int retryPause) {
     this.servers = servers;
     this.maxRetriesOnForward = maxRetriesOnForward;
     this.retryPause = retryPause;
+    this.updateExecutor = servers.getUpdateExecutor();
+    completionService = new ExecutorCompletionService<>(updateExecutor);
   }
   
   public void finish() {    
     try {
-      servers.blockUntilFinished();
-      doRetriesIfNeeded();
+      blockAndDoRetries();
     } finally {
       servers.shutdown();
     }
@@ -144,12 +159,11 @@ public class SolrCmdDistributor {
     servers.clearErrors();
     this.errors.clear();
     for (Error err : resubmitList) {
-      submit(err.req);
+      submit(err.req, false);
     }
     
     if (resubmitList.size() > 0) {
-      servers.blockUntilFinished();
-      doRetriesIfNeeded();
+      blockAndDoRetries();
     }
   }
   
@@ -168,7 +182,7 @@ public class SolrCmdDistributor {
         uReq.deleteByQuery(cmd.query);
       }
       
-      submit(new Req(cmd.toString(), node, uReq, sync));
+      submit(new Req(cmd.toString(), node, uReq, sync), false);
     }
   }
   
@@ -186,7 +200,7 @@ public class SolrCmdDistributor {
       UpdateRequest uReq = new UpdateRequest();
       uReq.setParams(params);
       uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      submit(new Req(cmd.toString(), node, uReq, synchronous, rrt));
+      submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
     }
     
   }
@@ -195,8 +209,7 @@ public class SolrCmdDistributor {
       ModifiableSolrParams params) throws IOException {
     
     // we need to do any retries before commit...
-    servers.blockUntilFinished();
-    doRetriesIfNeeded();
+    blockAndDoRetries();
     
     UpdateRequest uReq = new UpdateRequest();
     uReq.setParams(params);
@@ -206,10 +219,29 @@ public class SolrCmdDistributor {
     log.debug("Distrib commit to: {} params: {}", nodes, params);
     
     for (Node node : nodes) {
-      submit(new Req(cmd.toString(), node, uReq, false));
+      submit(new Req(cmd.toString(), node, uReq, false), true);
     }
     
   }
+
+  private void blockAndDoRetries() {
+    servers.blockUntilFinished();
+    
+    // wait for any async commits to complete
+    while (pending != null && pending.size() > 0) {
+      Future<Object> future = null;
+      try {
+        future = completionService.take();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.error("blockAndDoRetries interrupted", e);
+      }
+      if (future == null) break;
+      pending.remove(future);
+    }
+    doRetriesIfNeeded();
+
+  }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
     if (cmd == null) return;
@@ -217,10 +249,9 @@ public class SolrCmdDistributor {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
   }
 
-  private void submit(Req req) {
+  private void submit(final Req req, boolean isCommit) {
     if (req.synchronous) {
-      servers.blockUntilFinished();
-      doRetriesIfNeeded();
+      blockAndDoRetries();
       
       HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
           servers.getHttpClient());
@@ -234,12 +265,32 @@ public class SolrCmdDistributor {
       
       return;
     }
+    
     if (log.isDebugEnabled()) {
       log.debug("sending update to "
           + req.node.getUrl() + " retry:"
           + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
     }
     
+    if (isCommit) {
+      // a commit using ConncurrentUpdateSolrServer is not async,
+      // so we make it async to prevent commits from happening
+      // serially across multiple nodes
+      pending.add(completionService.submit(new Callable<Object>() {
+        
+        @Override
+        public Object call() throws Exception {
+          doRequest(req);
+          return null;
+        }
+
+      }));
+    } else {
+      doRequest(req);
+    }
+  }
+  
+  private void doRequest(final Req req) {
     try {
       SolrServer solrServer = servers.getSolrServer(req);
       solrServer.request(req.uReq);
@@ -281,6 +332,7 @@ public class SolrCmdDistributor {
       sb.append("; node=").append(String.valueOf(node));
       return sb.toString();
     }
+    
     public void trackRequestResult(HttpResponse resp, boolean success) {      
       if (rfTracker != null) {
         Integer rf = null;
@@ -496,4 +548,3 @@ public class SolrCmdDistributor {
   }
 }
 
-

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java?rev=1614118&r1=1614117&r2=1614118&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java Mon Jul 28 18:45:23 2014
@@ -123,4 +123,8 @@ public class StreamingSolrServers {
   public HttpClient getHttpClient() {
     return httpClient;
   }
+  
+  public ExecutorService getUpdateExecutor() {
+    return updateExecutor;
+  }
 }