You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/03 00:01:56 UTC
svn commit: r1368727 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/update/
solr/core/src/java/org/apache/solr/update/processor/
solr/core/src/java/org/apache/solr/util/
Author: markrmiller
Date: Thu Aug 2 22:01:56 2012
New Revision: 1368727
URL: http://svn.apache.org/viewvc?rev=1368727&view=rev
Log:
SOLR-3658: SolrCmdDistributor can briefly create spikes of threads in the thousands - second pass
Added:
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
- copied unchanged from r1368725, lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1368727&r1=1368726&r2=1368727&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu Aug 2 22:01:56 2012
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorComp
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkCo
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.AdjustableSemaphore;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,11 +63,12 @@ public class SolrCmdDistributor {
static BoundedExecutor commExecutor;
static final HttpClient client;
+ static AdjustableSemaphore semaphore;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 200);
- params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 8);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500);
+ params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16);
client = HttpClientUtil.createClient(params);
}
@@ -92,14 +94,22 @@ public class SolrCmdDistributor {
}
public SolrCmdDistributor(int numHosts) {
-
+ int maxPoolSize = Math.max(8, (numHosts-1) * 8);
BoundedExecutor executor = null;
synchronized (SolrCmdDistributor.class) {
- if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
+ if (semaphore == null) {
+ semaphore = new AdjustableSemaphore(maxPoolSize);
+ }
+
+ if (maxPoolSize != semaphore.getMaxPermits()) {
+ // raise the permits to match maxPoolSize
+ semaphore.setMaxPermits(maxPoolSize);
+ }
+
+ if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) {
// we don't shutdown the previous because all it's threads will die
- int maxPoolSize = Math.max(8, (numHosts-1) * 8);
commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
- TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("cmdDistribExecutor"));
}
executor = commExecutor;
@@ -343,11 +353,17 @@ public class SolrCmdDistributor {
} else {
clonedRequest.rspCode = -1;
}
+ } finally {
+ semaphore.release();
}
return clonedRequest;
}
};
-
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException();
+ }
pending.add(completionService.submit(task));
}
@@ -517,36 +533,33 @@ public class SolrCmdDistributor {
public class BoundedExecutor extends ThreadPoolExecutor {
private final Semaphore semaphore;
- public BoundedExecutor(int corePoolSize,
- int maximumPoolSize, long keepAliveTime, TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ public BoundedExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue,
+ threadFactory);
this.semaphore = new Semaphore(maximumPoolSize);
}
-
+
@Override
public void execute(final Runnable command) {
+// try {
+// System.out.println("semaphore aq:" + semaphore.availablePermits());
+// semaphore.acquire();
+// System.out.println("aquired");
+// } catch (InterruptedException e1) {
+// throw new RuntimeException();
+// }
try {
- semaphore.acquire();
- } catch (InterruptedException e1) {
- throw new RuntimeException();
- }
- try {
- super.execute(new Runnable() {
- public void run() {
- try {
- command.run();
- } finally {
- semaphore.release();
- }
- }
- });
+ super.execute(command);
} catch (RejectedExecutionException e) {
- semaphore.release();
throw e;
+ } finally {
+// semaphore.release();
+// System.out.println("semaphore re:" + semaphore.availablePermits());
}
}
-}
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1368727&r1=1368726&r2=1368727&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug 2 22:01:56 2012
@@ -158,17 +158,20 @@ public class DistributedUpdateProcessor
CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
+ zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
+ if (zkEnabled) {
+ numNodes = zkController.getZkStateReader().getCloudState().getLiveNodes().size();
+ }
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
-
- zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
+
cloudDesc = coreDesc.getCloudDescriptor();
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
}
-
+
cmdDistrib = new SolrCmdDistributor(numNodes);
}