You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/07/22 18:54:26 UTC
svn commit: r1364355 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/update/
Author: markrmiller
Date: Sun Jul 22 16:54:26 2012
New Revision: 1364355
URL: http://svn.apache.org/viewvc?rev=1364355&view=rev
Log:
SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create spikes of threads in the thousands.
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Jul 22 16:54:26 2012
@@ -117,6 +117,9 @@ Bug Fixes
* SOLR-3660: Velocity: Link to admin page broken (janhoy)
+* SOLR-3658: Adding thousands of docs with one UpdateProcessorChain instance can briefly create
+ spikes of threads in the thousands. (yonik, Mark Miller)
+
Other Changes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sun Jul 22 16:54:26 2012
@@ -24,12 +24,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -49,17 +53,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 6;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
// TODO: shut this thing down
// TODO: this cannot be per instance...
- static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
- Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new DefaultSolrThreadFactory("cmdDistribExecutor"));
+ static BoundedExecutor commExecutor;
static final HttpClient client;
@@ -91,8 +91,22 @@ public class SolrCmdDistributor {
ModifiableSolrParams params;
}
- public SolrCmdDistributor() {
-
+ public SolrCmdDistributor(int numHosts) {
+
+ BoundedExecutor executor = null;
+ synchronized (SolrCmdDistributor.class) {
+ if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
+ // we don't shutdown the previous because all it's threads will die
+ int maxPoolSize = Math.max(8, (numHosts-1) * 8);
+ commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
+ new DefaultSolrThreadFactory("cmdDistribExecutor"));
+ }
+ executor = commExecutor;
+ }
+
+ completionService = new ExecutorCompletionService<Request>(executor);
+ pending = new HashSet<Future<Request>>();
}
public void finish() {
@@ -297,10 +311,7 @@ public class SolrCmdDistributor {
}
public void submit(final Request sreq) {
- if (completionService == null) {
- completionService = new ExecutorCompletionService<Request>(commExecutor);
- pending = new HashSet<Future<Request>>();
- }
+
final String url = sreq.node.getUrl();
Callable<Request> task = new Callable<Request>() {
@@ -502,4 +513,40 @@ public class SolrCmdDistributor {
return nodeProps;
}
}
+
+ public class BoundedExecutor extends ThreadPoolExecutor {
+ private final Semaphore semaphore;
+
+ public BoundedExecutor(int corePoolSize,
+ int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ this.semaphore = new Semaphore(maximumPoolSize);
+ }
+
+ @Override
+ public void execute(final Runnable command) {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e1) {
+ throw new RuntimeException();
+ }
+ try {
+ super.execute(new Runnable() {
+ public void run() {
+ try {
+ command.run();
+ } finally {
+ semaphore.release();
+ }
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ semaphore.release();
+ throw e;
+ }
+ }
+}
}
+
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Jul 22 16:54:26 2012
@@ -130,6 +130,8 @@ public class DistributedUpdateProcessor
private boolean forwardToLeader = false;
private List<Node> nodes;
+ private int numNodes;
+
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -164,7 +166,7 @@ public class DistributedUpdateProcessor
collection = cloudDesc.getCollectionName();
}
- cmdDistrib = new SolrCmdDistributor();
+ cmdDistrib = new SolrCmdDistributor(numNodes);
}
private List<Node> setupRequest(int hash) {
@@ -172,6 +174,9 @@ public class DistributedUpdateProcessor
// if we are in zk mode...
if (zkEnabled) {
+ // set num nodes
+ numNodes = zkController.getCloudState().getLiveNodes().size();
+
// the leader is...
// TODO: if there is no leader, wait and look again
// TODO: we are reading the leader from zk every time - we should cache
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1364355&r1=1364354&r2=1364355&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Sun Jul 22 16:54:26 2012
@@ -82,7 +82,7 @@ public class SolrCmdDistributorTest exte
public void doTest() throws Exception {
//del("*:*");
- SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
+ SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(8);
ModifiableSolrParams params = new ModifiableSolrParams();
List<Node> nodes = new ArrayList<Node>();
@@ -116,7 +116,7 @@ public class SolrCmdDistributorTest exte
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
- cmdDistrib = new SolrCmdDistributor();
+ cmdDistrib = new SolrCmdDistributor(8);
cmd.solrDoc = sdoc("id", 2);
cmdDistrib.distribAdd(cmd, nodes, params);
@@ -149,7 +149,7 @@ public class SolrCmdDistributorTest exte
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
- cmdDistrib = new SolrCmdDistributor();
+ cmdDistrib = new SolrCmdDistributor(8);
cmdDistrib.distribDelete(dcmd, nodes, params);
cmdDistrib.distribCommit(ccmd, nodes, params);