You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/09/12 19:51:32 UTC

svn commit: r1522684 - in /lucene/dev/trunk/solr: CHANGES.txt solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java

Author: markrmiller
Date: Thu Sep 12 17:51:31 2013
New Revision: 1522684

URL: http://svn.apache.org/r1522684
Log:
SOLR-4816: Don't create "loads" of LBHttpSolrServer's, shutdown LBHttpSolrServer when appropriate, get collection from nonRoutableParams.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1522684&r1=1522683&r2=1522684&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Sep 12 17:51:31 2013
@@ -163,7 +163,7 @@ New Features
   Gun Akkor via Erick Erickson)
 
 * SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
-  update forwarding.  (Joel Bernstein, Mark Miller)
+  update forwarding.  (Joel Bernstein, Shikhar Bhushan, Mark Miller)
   
 * SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller)  
 

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1522684&r1=1522683&r2=1522684&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Thu Sep 12 17:51:31 2013
@@ -30,9 +30,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.http.client.HttpClient;
@@ -82,6 +84,7 @@ public class CloudSolrServer extends Sol
   private int zkClientTimeout = 10000;
   private volatile String defaultCollection;
   private final LBHttpSolrServer lbServer;
+  private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
   Random rand = new Random();
   
@@ -129,6 +132,7 @@ public class CloudSolrServer extends Sol
       this.myClient = HttpClientUtil.createClient(null);
       this.lbServer = new LBHttpSolrServer(myClient);
       this.updatesToLeaders = true;
+      shutdownLBHttpSolrServer = true;
   }
   
   public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@@ -137,6 +141,7 @@ public class CloudSolrServer extends Sol
     this.myClient = HttpClientUtil.createClient(null);
     this.lbServer = new LBHttpSolrServer(myClient);
     this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = true;
   }
 
   /**
@@ -148,6 +153,7 @@ public class CloudSolrServer extends Sol
     this.zkHost = zkHost;
     this.lbServer = lbServer;
     this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = false;
   }
   
   /**
@@ -160,6 +166,7 @@ public class CloudSolrServer extends Sol
     this.zkHost = zkHost;
     this.lbServer = lbServer;
     this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = false;
   }
   
   public ResponseParser getParser() {
@@ -266,11 +273,8 @@ public class CloudSolrServer extends Sol
         routableParams.remove(param);
       }
     }
-    if (params == null) {
-      return null;
-    }
 
-    String collection = params.get("collection", defaultCollection);
+    String collection = nonRoutableParams.get("collection", defaultCollection);
     if (collection == null) {
       throw new SolrServerException("No collection param specified on request and no default collection has been set.");
     }
@@ -307,47 +311,45 @@ public class CloudSolrServer extends Sol
       return null;
     }
 
-    Iterator<Map.Entry<String, LBHttpSolrServer.Req>> it = routes.entrySet().iterator();
-
     long start = System.nanoTime();
-    if(this.parallelUpdates) {
-      ArrayBlockingQueue<RequestTask> finishedTasks = new ArrayBlockingQueue<RequestTask>(routes.size());
 
-      while (it.hasNext()) {
-        Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
-        String url = entry.getKey();
-        LBHttpSolrServer.Req lbRequest = entry.getValue();
-        threadPool.execute(new RequestTask(url, lbRequest, finishedTasks));
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<String, Future<NamedList<?>>>();
+      for (final Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBHttpSolrServer.Req lbRequest = entry.getValue();
+        responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
+          @Override
+          public NamedList<?> call() throws Exception {
+            return lbServer.request(lbRequest).getResponse();
+          }
+        }));
       }
 
-      while ((shardResponses.size() + exceptions.size()) != routes.size()) {
-        RequestTask requestTask = null;
+      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
         try {
-          requestTask = finishedTasks.take();
-        } catch (Exception e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        }
-
-        Exception e = requestTask.getException();
-        if (e != null) {
-          exceptions.add(requestTask.getLeader(), e);
-        } else {
-          shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse());
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
         }
       }
 
-      if(exceptions.size() > 0) {
+      if (exceptions.size() > 0) {
         throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
       }
     } else {
-      while (it.hasNext()) {
-        Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
+      for (Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
         String url = entry.getKey();
         LBHttpSolrServer.Req lbRequest = entry.getValue();
-        try{
+        try {
           NamedList rsp = lbServer.request(lbRequest).getResponse();
           shardResponses.add(url, rsp);
-        } catch(Exception e) {
+        } catch (Exception e) {
           throw new SolrServerException(e);
         }
       }
@@ -439,44 +441,6 @@ public class CloudSolrServer extends Sol
     return condensed;
   }
 
-  class RequestTask implements Runnable {
-
-    private LBHttpSolrServer.Req req;
-    private ArrayBlockingQueue<RequestTask> tasks;
-    private LBHttpSolrServer.Rsp rsp;
-    private String leader;
-    private Exception e;
-
-    public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue<RequestTask> tasks) {
-      this.req = req;
-      this.tasks = tasks;
-      this.leader = leader;
-    }
-
-    public void run() {
-      try {
-        LBHttpSolrServer lb = new LBHttpSolrServer(myClient);
-        this.rsp = lb.request(req);
-        this.tasks.add(this);
-      } catch (Exception e) {
-        this.e = e;
-        this.tasks.add(this);
-      }
-    }
-
-    public Exception getException() {
-      return e;
-    }
-
-    public String getLeader() {
-      return this.leader;
-    }
-
-    public LBHttpSolrServer.Rsp getRsp() {
-      return rsp;
-    }
-  }
-
   class RouteResponse extends NamedList {
     private NamedList routeResponses;
     private Map<String, LBHttpSolrServer.Req> routes;
@@ -697,6 +661,11 @@ public class CloudSolrServer extends Sol
         zkStateReader = null;
       }
     }
+    
+    if (shutdownLBHttpSolrServer) {
+      lbServer.shutdown();
+    }
+    
     if (myClient!=null) {
       myClient.getConnectionManager().shutdown();
     }