You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/03 02:01:59 UTC

svn commit: r1368760 - in /lucene/dev/trunk/solr/core/src/java/org/apache/solr: update/SolrCmdDistributor.java util/AdjustableSemaphore.java

Author: markrmiller
Date: Fri Aug  3 00:01:58 2012
New Revision: 1368760

URL: http://svn.apache.org/viewvc?rev=1368760&view=rev
Log:
SOLR-3658: cleanup

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1368760&r1=1368759&r2=1368760&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Fri Aug  3 00:01:58 2012
@@ -24,16 +24,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -59,11 +55,12 @@ public class SolrCmdDistributor {
   public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
   
   // TODO: shut this thing down
-  // TODO: this cannot be per instance...
-  static BoundedExecutor commExecutor;
+  static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5,
+      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+      new DefaultSolrThreadFactory("cmdDistribExecutor"));;
 
   static final HttpClient client;
-  static AdjustableSemaphore semaphore;
+  static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
   
   static {
     ModifiableSolrParams params = new ModifiableSolrParams();
@@ -94,28 +91,14 @@ public class SolrCmdDistributor {
   }
   
   public SolrCmdDistributor(int numHosts) {
-    int maxPoolSize = Math.max(8, (numHosts-1) * 8);
-    BoundedExecutor executor = null;
-    synchronized (SolrCmdDistributor.class) {
-      if (semaphore == null) {
-        semaphore = new AdjustableSemaphore(maxPoolSize);
-      }
-      
-      if (maxPoolSize != semaphore.getMaxPermits()) {
-        // raise the permits to match maxPoolSize
-        semaphore.setMaxPermits(maxPoolSize);
-      }
-      
-      if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) {
-        // we don't shutdown the previous because all it's threads will die
-        commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
-            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-            new DefaultSolrThreadFactory("cmdDistribExecutor"));
-      }
-      executor = commExecutor;
-    }
+    int maxPermits = Math.max(8, (numHosts - 1) * 8);
     
-    completionService = new ExecutorCompletionService<Request>(executor);
+    // limits how many tasks can actually execute at once
+    if (maxPermits != semaphore.getMaxPermits()) {
+      semaphore.setMaxPermits(maxPermits);
+    }
+
+    completionService = new ExecutorCompletionService<Request>(commExecutor);
     pending = new HashSet<Future<Request>>();
   }
   
@@ -530,36 +513,6 @@ public class SolrCmdDistributor {
     }
   }
   
-  public class BoundedExecutor extends ThreadPoolExecutor {
-    private final Semaphore semaphore;
-    
-    public BoundedExecutor(int corePoolSize, int maximumPoolSize,
-        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
-        ThreadFactory threadFactory) {
-      super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue,
-          threadFactory);
-      this.semaphore = new Semaphore(maximumPoolSize);
-    }
-    
-    @Override
-    public void execute(final Runnable command) {
-//      try {
-//        System.out.println("semaphore aq:" + semaphore.availablePermits());
-//        semaphore.acquire();
-//        System.out.println("aquired");
-//      } catch (InterruptedException e1) {
-//        throw new RuntimeException();
-//      }
-      try {
-        super.execute(command);
-      } catch (RejectedExecutionException e) {
-        throw e;
-      } finally {
-//        semaphore.release();
-//        System.out.println("semaphore re:" + semaphore.availablePermits());
-      }
-    }
-  }
 }
 
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java?rev=1368760&r1=1368759&r2=1368760&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java Fri Aug  3 00:01:58 2012
@@ -61,7 +61,7 @@ final public class AdjustableSemaphore {
     this.semaphore.acquire();
   }
   
-  public int getMaxPermits() {
+  public synchronized int getMaxPermits() {
     return maxPermits;
   }