You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2013/09/17 16:53:48 UTC

svn commit: r1524080 - in /lucene/dev/branches/lucene_solr_4_5: ./ solr/ solr/CHANGES.txt solr/core/ solr/core/src/java/org/apache/solr/request/SimpleFacets.java

Author: dsmiley
Date: Tue Sep 17 14:53:48 2013
New Revision: 1524080

URL: http://svn.apache.org/r1524080
Log:
SOLR-2548: Simplified multi-threading of facet.threads logic

Modified:
    lucene/dev/branches/lucene_solr_4_5/   (props changed)
    lucene/dev/branches/lucene_solr_4_5/solr/   (props changed)
    lucene/dev/branches/lucene_solr_4_5/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/lucene_solr_4_5/solr/core/   (props changed)
    lucene/dev/branches/lucene_solr_4_5/solr/core/src/java/org/apache/solr/request/SimpleFacets.java

Modified: lucene/dev/branches/lucene_solr_4_5/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_5/solr/CHANGES.txt?rev=1524080&r1=1524079&r2=1524080&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_5/solr/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_5/solr/CHANGES.txt Tue Sep 17 14:53:48 2013
@@ -118,7 +118,7 @@ New Features
 * SOLR-2548: Allow multiple threads to be specified for faceting. When threading, one
   can specify facet.threads to parallelize loading the uninverted fields. In at least
   one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta,
-  Gun Akkor via Erick Erickson)
+  Gun Akkor via Erick Erickson, David Smiley)
 
 * SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
   update forwarding.  (Joel Bernstein, Shikhar Bhushan, Mark Miller)

Modified: lucene/dev/branches/lucene_solr_4_5/solr/core/src/java/org/apache/solr/request/SimpleFacets.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_5/solr/core/src/java/org/apache/solr/request/SimpleFacets.java?rev=1524080&r1=1524079&r2=1524080&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_5/solr/core/src/java/org/apache/solr/request/SimpleFacets.java (original)
+++ lucene/dev/branches/lucene_solr_4_5/solr/core/src/java/org/apache/solr/request/SimpleFacets.java Tue Sep 17 14:53:48 2013
@@ -17,26 +17,6 @@
 
 package org.apache.solr.request;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.DocsEnum;
 import org.apache.lucene.index.Fields;
@@ -100,6 +80,26 @@ import org.apache.solr.util.DateMathPars
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.LongPriorityQueue;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A class that generates simple Facet information for a request.
  *
@@ -525,6 +525,7 @@ public class SimpleFacets {
    * @see #getFieldMissingCount
    * @see #getFacetTermEnumCounts
    */
+  @SuppressWarnings("unchecked")
   public NamedList<Object> getFacetFieldCounts()
       throws IOException, SyntaxError {
 
@@ -534,69 +535,67 @@ public class SimpleFacets {
       return res;
     }
 
+    // Passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable.
+    // Also, a subtlety of directExecutor is that no matter how many times you "submit" a job, it's really
+    // just a method call in that it's run by the calling thread.
     int maxThreads = req.getParams().getInt(FacetParams.FACET_THREADS, 0);
     Executor executor = maxThreads == 0 ? directExecutor : facetExecutor;
+    final Semaphore semaphore = new Semaphore((maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads);
+    List<Future<NamedList>> futures = new ArrayList<Future<NamedList>>(facetFs.length);
 
-    // passing a negative number for FACET_THREADS implies an unlimited number of threads is acceptable.
-    // Also, a subtlety of directeExecutor is that no matter how many times you "submit" a job, it's really
-    // just a method call in that it's run by this thread.
-    maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads;
-    CompletionService completionService = new ExecutorCompletionService(executor);
-    LinkedList<Callable> pending = new LinkedList<Callable>();
-    for (String f : facetFs) {
-      parseParams(FacetParams.FACET_FIELD, f);
-      final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS);
-      final String workerKey = key;
-      final String workerFacetValue = facetValue;
-      final DocSet workerBase = this.docs;
-      Callable worker = new Callable() {
-        @Override
-        public Object call() throws Exception {
-          NamedList<Object> result = new SimpleOrderedMap<Object>();
-          try {
-            if(termList != null) {
-              result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase));
-            } else {
-              result.add(workerKey, getTermCounts(workerFacetValue, workerBase));
+    try {
+      //Loop over fields; submit to executor, keeping the future
+      for (String f : facetFs) {
+        parseParams(FacetParams.FACET_FIELD, f);
+        final String termList = localParams == null ? null : localParams.get(CommonParams.TERMS);
+        final String workerKey = key;
+        final String workerFacetValue = facetValue;
+        final DocSet workerBase = this.docs;
+        Callable<NamedList> callable = new Callable<NamedList>() {
+          @Override
+          public NamedList call() throws Exception {
+            try {
+              NamedList<Object> result = new SimpleOrderedMap<Object>();
+              if(termList != null) {
+                result.add(workerKey, getListedTermCounts(workerFacetValue, termList, workerBase));
+              } else {
+                result.add(workerKey, getTermCounts(workerFacetValue, workerBase));
+              }
+              return result;
+            } catch (SolrException se) {
+              throw se;
+            } catch (Exception e) {
+              throw new SolrException(ErrorCode.SERVER_ERROR,
+                                      "Exception during facet.field: " + workerFacetValue, e.getCause());
+            } finally {
+              semaphore.release();
             }
-          } catch (SolrException se) {
-            throw se;
-          } catch (Exception e){
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                                    "Exception during facet.field: " + workerFacetValue, e.getCause());
           }
-          return result;
-        }
-      };
-      if (--maxThreads >= 0) {
-        completionService.submit(worker);
-      } else {
-        pending.add(worker);
-      }
-    }
-    for (String f : facetFs) {
-      NamedList taskResult;
-      try {
-        Future future = completionService.take();
-        taskResult = (NamedList)future.get();
-        if (taskResult != null) {
-          res.addAll(taskResult);
-        }
-        if (pending.isEmpty() == false) {
-          completionService.submit(pending.removeFirst());
-        }
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Processing of facet fields InterruptedException", e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof SolrException) {
-          throw (SolrException) cause;
-        }
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            "Processing of facet fields ExecutionException ", e);
+        };
+
+        RunnableFuture<NamedList> runnableFuture = new FutureTask<NamedList>(callable);
+        semaphore.acquire();//may block and/or interrupt
+        executor.execute(runnableFuture);//releases semaphore when done
+        futures.add(runnableFuture);
+      }//facetFs loop
+
+      //Loop over futures to get the values. The order is the same as facetFs but shouldn't matter.
+      for (Future<NamedList> future : futures) {
+        res.addAll(future.get());
+      }
+      assert semaphore.availablePermits() >= maxThreads;
+    } catch (InterruptedException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error while processing facet fields: InterruptedException", e);
+    } catch (ExecutionException ee) {
+      Throwable e = ee.getCause();//unwrap
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
       }
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error while processing facet fields: " + e.toString(), e);
     }
+
     return res;
   }
 
@@ -1205,7 +1204,7 @@ public class SimpleFacets {
   }
 
   private <T extends Comparable<T>> NamedList getFacetRangeCounts
-    (final SchemaField sf, 
+    (final SchemaField sf,
      final RangeEndpointCalculator<T> calc) throws IOException {
     
     final String f = sf.getName();
@@ -1338,7 +1337,7 @@ public class SimpleFacets {
    */
   protected int rangeCount(SchemaField sf, String low, String high,
                            boolean iLow, boolean iHigh) throws IOException {
-    Query rangeQ = sf.getType().getRangeQuery(null, sf,low,high,iLow,iHigh);
+    Query rangeQ = sf.getType().getRangeQuery(null, sf, low, high, iLow, iHigh);
     if (params.getBool(GroupParams.GROUP_FACET, false)) {
       return getGroupedFacetQueryCount(rangeQ);
     } else {
@@ -1352,8 +1351,8 @@ public class SimpleFacets {
   @Deprecated
   protected int rangeCount(SchemaField sf, Date low, Date high,
                            boolean iLow, boolean iHigh) throws IOException {
-    Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf,low,high,iLow,iHigh);
-    return searcher.numDocs(rangeQ , docs);
+    Query rangeQ = ((DateField)(sf.getType())).getRangeQuery(null, sf, low, high, iLow, iHigh);
+    return searcher.numDocs(rangeQ, docs);
   }
   
   /**