You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/03 00:01:56 UTC

svn commit: r1368727 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/update/ solr/core/src/java/org/apache/solr/update/processor/ solr/core/src/java/org/apache/solr/util/

Author: markrmiller
Date: Thu Aug  2 22:01:56 2012
New Revision: 1368727

URL: http://svn.apache.org/viewvc?rev=1368727&view=rev
Log:
SOLR-3658: SolrCmdDistributor can briefly create spikes of threads in the thousands - second pass

Added:
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
      - copied unchanged from r1368725, lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/AdjustableSemaphore.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1368727&r1=1368726&r2=1368727&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Thu Aug  2 22:01:56 2012
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorComp
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -48,6 +48,7 @@ import org.apache.solr.common.cloud.ZkCo
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.AdjustableSemaphore;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,11 +63,12 @@ public class SolrCmdDistributor {
   static BoundedExecutor commExecutor;
 
   static final HttpClient client;
+  static AdjustableSemaphore semaphore;
   
   static {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 200);
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 8);
+    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500);
+    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16);
     client = HttpClientUtil.createClient(params);
   }
   
@@ -92,14 +94,22 @@ public class SolrCmdDistributor {
   }
   
   public SolrCmdDistributor(int numHosts) {
-
+    int maxPoolSize = Math.max(8, (numHosts-1) * 8);
     BoundedExecutor executor = null;
     synchronized (SolrCmdDistributor.class) {
-      if (commExecutor == null || commExecutor.getMaximumPoolSize() != numHosts) {
+      if (semaphore == null) {
+        semaphore = new AdjustableSemaphore(maxPoolSize);
+      }
+      
+      if (maxPoolSize != semaphore.getMaxPermits()) {
+        // raise the permits to match maxPoolSize
+        semaphore.setMaxPermits(maxPoolSize);
+      }
+      
+      if (commExecutor == null || commExecutor.getMaximumPoolSize() != maxPoolSize) {
         // we don't shutdown the previous because all it's threads will die
-        int maxPoolSize = Math.max(8, (numHosts-1) * 8);
         commExecutor = new BoundedExecutor(0, maxPoolSize, 5,
-            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize * 2),
+            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
             new DefaultSolrThreadFactory("cmdDistribExecutor"));
       }
       executor = commExecutor;
@@ -343,11 +353,17 @@ public class SolrCmdDistributor {
           } else {
             clonedRequest.rspCode = -1;
           }
+        } finally {
+          semaphore.release();
         }
         return clonedRequest;
       }
     };
-    
+    try {
+      semaphore.acquire();
+    } catch (InterruptedException e) {
+      throw new RuntimeException();
+    }
     pending.add(completionService.submit(task));
     
   }
@@ -517,36 +533,33 @@ public class SolrCmdDistributor {
   public class BoundedExecutor extends ThreadPoolExecutor {
     private final Semaphore semaphore;
     
-    public BoundedExecutor(int corePoolSize,
-        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
-        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
-      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+    public BoundedExecutor(int corePoolSize, int maximumPoolSize,
+        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+        ThreadFactory threadFactory) {
+      super(corePoolSize, Integer.MAX_VALUE, keepAliveTime, unit, workQueue,
+          threadFactory);
       this.semaphore = new Semaphore(maximumPoolSize);
     }
-
+    
     @Override
     public void execute(final Runnable command) {
+//      try {
+//        System.out.println("semaphore aq:" + semaphore.availablePermits());
+//        semaphore.acquire();
+//        System.out.println("aquired");
+//      } catch (InterruptedException e1) {
+//        throw new RuntimeException();
+//      }
       try {
-        semaphore.acquire();
-      } catch (InterruptedException e1) {
-        throw new RuntimeException();
-      }
-      try {
-        super.execute(new Runnable() {
-          public void run() {
-            try {
-              command.run();
-            } finally {
-              semaphore.release();
-            }
-          }
-        });
+        super.execute(command);
       } catch (RejectedExecutionException e) {
-        semaphore.release();
         throw e;
+      } finally {
+//        semaphore.release();
+//        System.out.println("semaphore re:" + semaphore.availablePermits());
       }
     }
-}
+  }
 }
 
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1368727&r1=1368726&r2=1368727&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug  2 22:01:56 2012
@@ -158,17 +158,20 @@ public class DistributedUpdateProcessor 
     CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
     
     this.zkEnabled  = coreDesc.getCoreContainer().isZooKeeperAware();
+    zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
+    if (zkEnabled) {
+      numNodes =  zkController.getZkStateReader().getCloudState().getLiveNodes().size();
+    }
     //this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
 
-    
-    zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
+   
     
     cloudDesc = coreDesc.getCloudDescriptor();
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
     }
-    
+
     cmdDistrib = new SolrCmdDistributor(numNodes);
   }