You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/03 02:01:59 UTC
svn commit: r1368760 - in
/lucene/dev/trunk/solr/core/src/java/org/apache/solr:
update/SolrCmdDistributor.java util/AdjustableSemaphore.java
Author: markrmiller
Date: Fri Aug 3 00:01:58 2012
New Revision: 1368760
URL: http://svn.apache.org/viewvc?rev=1368760&view=rev
Log:
SOLR-3658: cleanup
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1368760&r1=1368759&r2=1368760&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Fri Aug 3 00:01:58 2012
@@ -24,16 +24,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -59,11 +55,12 @@ public class SolrCmdDistributor {
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
// TODO: shut this thing down
- // TODO: this cannot be per instance...
- static BoundedExecutor commExecutor;
+ static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("cmdDistribExecutor"));;
static final HttpClient client;
- static AdjustableSemaphore semaphore;
+ static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
static {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -94,28 +91,14 @@ public class SolrCmdDistributor {
}
public SolrCmdDistributor(int numHosts) {
- int maxPoolSize = Math.max(8, (numHosts-1) * 8);
- BoundedExecutor executor = null;
- synchronized (SolrCmdDistributor.class) {
- if (semaphore == null) {
- semaphore = new AdjustableSemaphore(maxPoolSize);
- }
-
- if (maxPoolSize != semaphore.getMaxPermits()) {
- // raise the permits to match maxPoolSize
- semaphore.setMaxPermits(maxPoolSize);
- }
-
- if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) {
- // we don't shutdown the previous because all it's threads will die
- commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
- TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new DefaultSolrThreadFactory("cmdDistribExecutor"));
- }
- executor = commExecutor;
- }
+ int maxPermits = Math.max(8, (numHosts - 1) * 8);
- completionService = new ExecutorCompletionService<Request>(executor);
+ // limits how many tasks can actually execute at once
+ if (maxPermits != semaphore.getMaxPermits()) {
+ semaphore.setMaxPermits(maxPermits);
+ }
+
+ completionService = new ExecutorCompletionService<Request>(commExecutor);
pending = new HashSet<Future<Request>>();
}
@@ -530,36 +513,6 @@ public class SolrCmdDistributor {
}
}
- public class BoundedExecutor extends ThreadPoolExecutor {
- private final Semaphore semaphore;
-
- public BoundedExecutor(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue,
- threadFactory);
- this.semaphore = new Semaphore(maximumPoolSize);
- }
-
- @Override
- public void execute(final Runnable command) {
-// try {
-// System.out.println("semaphore aq:" + semaphore.availablePermits());
-// semaphore.acquire();
-// System.out.println("aquired");
-// } catch (InterruptedException e1) {
-// throw new RuntimeException();
-// }
- try {
- super.execute(command);
- } catch (RejectedExecutionException e) {
- throw e;
- } finally {
-// semaphore.release();
-// System.out.println("semaphore re:" + semaphore.availablePermits());
- }
- }
- }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java?rev=1368760&r1=1368759&r2=1368760&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java Fri Aug 3 00:01:58 2012
@@ -61,7 +61,7 @@ final public class AdjustableSemaphore {
this.semaphore.acquire();
}
- public int getMaxPermits() {
+ public synchronized int getMaxPermits() {
return maxPermits;
}