You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/07/22 18:54:26 UTC

svn commit: r1364355 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/update/

Author: markrmiller
Date: Sun Jul 22 16:54:26 2012
New Revision: 1364355

URL: http://svn.apache.org/viewvc?rev=1364355&view=rev
Log:
SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create spikes of threads in the thousands. 

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Jul 22 16:54:26 2012
@@ -117,6 +117,9 @@ Bug Fixes
 
 * SOLR-3660: Velocity: Link to admin page broken (janhoy)
 
+* SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create 
+  spikes of threads in the thousands. (yonik, Mark Miller)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sun Jul 22 16:54:26 2012
@@ -24,12 +24,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -49,17 +53,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-
-
 public class SolrCmdDistributor {
   private static final int MAX_RETRIES_ON_FORWARD = 6;
   public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
   
   // TODO: shut this thing down
   // TODO: this cannot be per instance...
-  static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
-      Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-      new DefaultSolrThreadFactory("cmdDistribExecutor"));
+  static BoundedExecutor commExecutor;
 
   static final HttpClient client;
   
@@ -91,8 +91,22 @@ public class SolrCmdDistributor {
     ModifiableSolrParams params;
   }
   
-  public SolrCmdDistributor() {
-   
+  public SolrCmdDistributor(int numHosts) {
+
+    BoundedExecutor executor = null;
+    synchronized (SolrCmdDistributor.class) {
+      if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
+        // we don't shutdown the previous because all it's threads will die
+        int maxPoolSize = Math.max(8, (numHosts-1) * 8);
+        commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
+            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
+            new DefaultSolrThreadFactory("cmdDistribExecutor"));
+      }
+      executor = commExecutor;
+    }
+    
+    completionService = new ExecutorCompletionService<Request>(executor);
+    pending = new HashSet<Future<Request>>();
   }
   
   public void finish() {
@@ -297,10 +311,7 @@ public class SolrCmdDistributor {
   }
   
   public void submit(final Request sreq) {
-    if (completionService == null) {
-      completionService = new ExecutorCompletionService<Request>(commExecutor);
-      pending = new HashSet<Future<Request>>();
-    }
+
     final String url = sreq.node.getUrl();
 
     Callable<Request> task = new Callable<Request>() {
@@ -502,4 +513,40 @@ public class SolrCmdDistributor {
       return nodeProps;
     }
   }
+  
+  public class BoundedExecutor extends ThreadPoolExecutor {
+    private final Semaphore semaphore;
+    
+    public BoundedExecutor(int corePoolSize,
+        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+      this.semaphore = new Semaphore(maximumPoolSize);
+    }
+
+    @Override
+    public void execute(final Runnable command) {
+      try {
+        semaphore.acquire();
+      } catch (InterruptedException e1) {
+        throw new RuntimeException();
+      }
+      try {
+        super.execute(new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } finally {
+              semaphore.release();
+            }
+          }
+        });
+      } catch (RejectedExecutionException e) {
+        semaphore.release();
+        throw e;
+      }
+    }
+}
 }
+
+

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Jul 22 16:54:26 2012
@@ -130,6 +130,8 @@ public class DistributedUpdateProcessor 
   private boolean forwardToLeader = false;
   private List<Node> nodes;
 
+  private int numNodes;
+
   
   public DistributedUpdateProcessor(SolrQueryRequest req,
       SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -164,7 +166,7 @@ public class DistributedUpdateProcessor 
       collection = cloudDesc.getCollectionName();
     }
     
-    cmdDistrib = new SolrCmdDistributor();
+    cmdDistrib = new SolrCmdDistributor(numNodes);
   }
 
   private List<Node> setupRequest(int hash) {
@@ -172,6 +174,9 @@ public class DistributedUpdateProcessor 
 
     // if we are in zk mode...
     if (zkEnabled) {
+      // set num nodes
+      numNodes = zkController.getCloudState().getLiveNodes().size();
+      
       // the leader is...
       // TODO: if there is no leader, wait and look again
       // TODO: we are reading the leader from zk every time - we should cache

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Sun Jul 22 16:54:26 2012
@@ -82,7 +82,7 @@ public class SolrCmdDistributorTest exte
   public void doTest() throws Exception {
     //del("*:*");
     
-    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
+    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
     
     ModifiableSolrParams params = new ModifiableSolrParams();
     List<Node> nodes = new ArrayList<Node>();
@@ -116,7 +116,7 @@ public class SolrCmdDistributorTest exte
     nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
     
     // add another 2 docs to control and 3 to client
-    cmdDistrib = new SolrCmdDistributor();
+    cmdDistrib = new SolrCmdDistributor(8);
     cmd.solrDoc = sdoc("id", 2);
     cmdDistrib.distribAdd(cmd, nodes, params);
     
@@ -149,7 +149,7 @@ public class SolrCmdDistributorTest exte
     DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
     dcmd.id = "2";
     
-    cmdDistrib = new SolrCmdDistributor();
+    cmdDistrib = new SolrCmdDistributor(8);
     cmdDistrib.distribDelete(dcmd, nodes, params);
     
     cmdDistrib.distribCommit(ccmd, nodes, params);