You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2010/01/12 02:20:55 UTC

svn commit: r898144 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/handler/component/ solrj/org/apache/solr/client/solrj/impl/ test/org/apache/solr/ test/org/apache/solr/client/solrj/ test/org/apache/solr/cloud/

Author: yonik
Date: Tue Jan 12 01:20:54 2010
New Revision: 898144

URL: http://svn.apache.org/viewvc?rev=898144&view=rev
Log:
SOLR-1698: load balanced distrib search

Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java
    lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java Tue Jan 12 01:20:54 2010
@@ -21,6 +21,7 @@
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.RTimer;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -33,6 +34,7 @@
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.apache.solr.core.SolrCore;
 import org.apache.lucene.queryParser.ParseException;
@@ -43,6 +45,7 @@
 import org.slf4j.LoggerFactory;
 import java.util.*;
 import java.util.concurrent.*;
+import java.net.MalformedURLException;
 
 /**
  *
@@ -353,6 +356,8 @@
 
 
   static HttpClient client;
+  static Random r = new Random();
+  static LBHttpSolrServer loadbalancer;
 
   static {
     MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
@@ -361,12 +366,23 @@
     mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
     mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
     // mgr.getParams().setStaleCheckingEnabled(false);
-    client = new HttpClient(mgr);    
+    client = new HttpClient(mgr);
+    try {
+      loadbalancer = new LBHttpSolrServer(client);
+    } catch (MalformedURLException e) {
+      // should be impossible since we're not passing any URLs here
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
+    }
   }
 
   CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
   Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
 
+  // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+  // This is primarily to keep track of what order we should use to query the replicas of a shard
+  // so that we use the same replica for all phases of a distributed request.
+  Map<String,List<String>> shardToURLs = new HashMap<String,List<String>>();
+
   HttpCommComponent() {
   }
 
@@ -390,7 +406,36 @@
     }
   }
 
+
+  // Not thread safe... don't use in Callable.
+  // Don't modify the returned URL list.
+  private List<String> getURLs(String shard) {
+    List<String> urls = shardToURLs.get(shard);
+    if (urls==null) {
+      urls = StrUtils.splitSmart(shard,"|",true);
+
+      // convert shard to URL
+      for (int i=0; i<urls.size(); i++) {
+        urls.set(i, "http://"+urls.get(i));
+      }
+
+      //
+      // Shuffle the list instead of use round-robin by default.
+      // This prevents accidental synchronization where multiple shards could get in sync
+      // and query the same replica at the same time.
+      //
+      if (urls.size() > 1)
+        Collections.shuffle(urls, r);
+      shardToURLs.put(shard, urls);
+    }
+    return urls;
+  }
+
+
   void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+    // do this outside of the callable for thread safety reasons
+    final List<String> urls = getURLs(shard);
+
     Callable<ShardResponse> task = new Callable<ShardResponse>() {
       public ShardResponse call() throws Exception {
 
@@ -402,13 +447,9 @@
         long startTime = System.currentTimeMillis();
 
         try {
-          // String url = "http://" + shard + "/select";
-          String url = "http://" + shard;
-
           params.remove(CommonParams.WT); // use default (currently javabin)
           params.remove(CommonParams.VERSION);
 
-          SolrServer server = new CommonsHttpSolrServer(url, client);
           // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
           // use generic request to avoid extra processing of queries
           QueryRequest req = new QueryRequest(params);
@@ -416,10 +457,17 @@
 
           // no need to set the response parser as binary is the default
           // req.setResponseParser(new BinaryResponseParser());
-          // srsp.rsp = server.request(req);
-          // srsp.rsp = server.query(sreq.params);
 
-          ssr.nl = server.request(req);
+          if (urls.size() <= 1) {
+            String url = urls.get(0);
+            srsp.setShardAddress(url);
+            SolrServer server = new CommonsHttpSolrServer(url, client);
+            ssr.nl = server.request(req);
+          } else {
+            LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
+            ssr.nl = rsp.getResponse();
+            srsp.setShardAddress(rsp.getServer());
+          }
         } catch (Throwable th) {
           srsp.setException(th);
           if (th instanceof SolrException) {

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java Tue Jan 12 01:20:54 2010
@@ -20,63 +20,68 @@
 import org.apache.solr.common.SolrException;
 
 public final class ShardResponse {
-	  private ShardRequest req;
-	  private String shard;
-	  private String shardAddress;  // the specific shard that this response was received from  
-	  private int rspCode;
-	  private Throwable exception;
-	  private SolrResponse rsp;
-
-	  public String toString() {
-	    return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress
-	            +"\n\trequest=" + req
-	            +"\n\tresponse=" + rsp
-	            + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception)) 
-	            +"\n}";
-	  }
-	  
-	  public Throwable getException()
-	  {
-		  return exception;
-	  }
-	  
-	  public ShardRequest getShardRequest()
-	  {
-		  return req;
-	  }
-	  
-	  public SolrResponse getSolrResponse()
-	  {
-		  return rsp;
-	  }
-	  
-	  public String getShard()
-	  {
-		  return shard;
-	  }
-	  
-	  void setShardRequest(ShardRequest rsp)
-	  {
-		  this.req = rsp;
-	  }
-	  
-	  void setSolrResponse(SolrResponse rsp)
-	  {
-		  this.rsp = rsp;
-	  }
-	  
-	  void setShard(String shard)
-	  {
-		  this.shard = shard;
-	  }
-	  
-	  void setException(Throwable exception)
-	  {
-		  this.exception = exception;
-	  }
-	  
-	  void setResponseCode(int rspCode)
-	  {
-		  this.rspCode = rspCode;
-	  }
+  private ShardRequest req;
+  private String shard;
+  private String shardAddress;  // the specific shard that this response was received from
+  private int rspCode;
+  private Throwable exception;
+  private SolrResponse rsp;
+
+  public String toString() {
+    return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress
+            +"\n\trequest=" + req
+            +"\n\tresponse=" + rsp
+            + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception))
+            +"\n}";
+  }
+
+  public Throwable getException()
+  {
+    return exception;
+  }
+
+  public ShardRequest getShardRequest()
+  {
+    return req;
+  }
+
+  public SolrResponse getSolrResponse()
+  {
+    return rsp;
+  }
+
+  public String getShard()
+  {
+    return shard;
+  }
+
+  void setShardRequest(ShardRequest rsp)
+  {
+    this.req = rsp;
+  }
+
+  void setSolrResponse(SolrResponse rsp)
+  {
+    this.rsp = rsp;
+  }
+
+  void setShard(String shard)
+  {
+    this.shard = shard;
+  }
+
+  void setException(Throwable exception)
+  {
+    this.exception = exception;
+  }
+
+  void setResponseCode(int rspCode)
+  {
+    this.rspCode = rspCode;
+  }
+
+  /** What was the shard address that returned this response.  Example:  "http://localhost:8983/solr" */
+  public String getShardAddress() { return this.shardAddress; }
+
+  void setShardAddress(String addr) { this.shardAddress = addr; }
 }

Modified: lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Tue Jan 12 01:20:54 2010
@@ -27,26 +27,24 @@
 import java.lang.ref.WeakReference;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.*;
 
 /**
- * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is just a wrapper to CommonsHttpSolrServer. This is useful when you
+ * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is a load balancing wrapper to CommonsHttpSolrServer. This is useful when you
  * have multiple SolrServers and the requests need to be Load Balanced among them. This should <b>NOT</b> be used for
  * indexing. Also see the <a href="http://wiki.apache.org/solr/LBHttpSolrServer">wiki</a> page.
  * <p/>
  * It offers automatic failover when a server goes down and it detects when the server comes back up.
  * <p/>
- * Load balancing is done using a simple roundrobin on the list of servers.
+ * Load balancing is done using a simple round-robin on the list of servers.
  * <p/>
  * If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
  * off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
  * This process is continued till it tries all the live servers. If atleast one server is alive, the request succeeds,
- * andif not it fails.
+ * and if not it fails.
  * <blockquote><pre>
  * SolrServer lbHttpSolrServer = new LBHttpSolrServer("http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
  * //or if you wish to pass the HttpClient do as follows
@@ -57,23 +55,33 @@
  * This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
  * <p/>
  * <b>When to use this?</b><br/> This can be used as a software load balancer when you do not wish to setup an external
- * load balancer. The code is relatively new and the API is currently experimental. Alternatives to this code are to use
+ * load balancer. Alternatives to this code are to use
  * a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
  * href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
  *
- * @version $Id$
  * @since solr 1.4
  */
 public class LBHttpSolrServer extends SolrServer {
-  private final CopyOnWriteArrayList<ServerWrapper> aliveServers = new CopyOnWriteArrayList<ServerWrapper>();
-  private final CopyOnWriteArrayList<ServerWrapper> zombieServers = new CopyOnWriteArrayList<ServerWrapper>();
+
+
+  // keys to the maps are currently of the form "http://localhost:8983/solr"
+  // which should be equivalent to CommonsHttpSolrServer.getBaseURL()
+  private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<String, ServerWrapper>();
+  // access to aliveServers should be synchronized on itself
+  
+  private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<String, ServerWrapper>();
+
+  // changes to aliveServers are reflected in this array, no need to synchronize
+  private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+
+
   private ScheduledExecutorService aliveCheckExecutor;
 
   private HttpClient httpClient;
   private final AtomicInteger counter = new AtomicInteger(-1);
 
-  private ReentrantLock checkLock = new ReentrantLock();
   private static final SolrQuery solrQuery = new SolrQuery("*:*");
+  private static final BinaryResponseParser binaryParser = new BinaryResponseParser();
 
   static {
     solrQuery.setRows(0);
@@ -82,8 +90,13 @@
   private static class ServerWrapper {
     final CommonsHttpSolrServer solrServer;
 
-    // Used only by the thread in aliveCheckExecutor
-    long lastUsed, lastChecked;
+    long lastUsed;     // last time used for a real request
+    long lastChecked;  // last time checked for liveness
+
+    // "standard" servers are used by default.  They normally live in the alive list
+    // and move to the zombie list when unavailable.  Wne they become available again,
+    // they move back to the alive list.
+    boolean standard = true;
 
     int failedPings = 0;
 
@@ -94,33 +107,220 @@
     public String toString() {
       return solrServer.getBaseURL();
     }
+
+    public String getKey() {
+      return solrServer.getBaseURL();
+    }
+
+    @Override
+    public int hashCode() {
+      return this.getKey().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!(obj instanceof ServerWrapper)) return false;
+      return this.getKey().equals(((ServerWrapper)obj).getKey());
+    }
+  }
+
+  public static class Req {
+    protected SolrRequest request;
+    protected List<String> servers;
+    protected int numDeadServersToTry;
+
+    public Req(SolrRequest request, List<String> servers) {
+      this.request = request;
+      this.servers = servers;
+      this.numDeadServersToTry = servers.size();
+    }
+
+    public SolrRequest getRequest() {
+      return request;
+    }
+    public List<String> getServers() {
+      return servers;
+    }
+
+    /** @return the number of dead servers to try if there are no liver servers left */
+    public int getNumDeadServersToTry() {
+      return numDeadServersToTry;
+    }
+
+    /** @return The number of dead servers to try if there are no liver servers left.
+     * Defaults to the number of servers in this request. */
+    public void setNumDeadServersToTry(int numDeadServersToTry) {
+      this.numDeadServersToTry = numDeadServersToTry;
+    }
+  }
+
+  public static class Rsp {
+    protected String server;
+    protected NamedList<Object> rsp;
+
+    /** The response from the server */
+    public NamedList<Object> getResponse() {
+      return rsp;
+    }
+
+    /** The server that returned the response */
+    public String getServer() {
+      return server;
+    }
   }
 
   public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
     this(new HttpClient(new MultiThreadedHttpConnectionManager()), solrServerUrls);
   }
 
+  /** The provided httpClient should use a multi-threaded connection manager */ 
   public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
           throws MalformedURLException {
     this(httpClient, new BinaryResponseParser(), solrServerUrl);
   }
 
+  /** The provided httpClient should use a multi-threaded connection manager */  
   public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
           throws MalformedURLException {
     this.httpClient = httpClient;
     for (String s : solrServerUrl) {
-      aliveServers.add(new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser)));
+      ServerWrapper wrapper = new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser));
+      aliveServers.put(wrapper.getKey(), wrapper);
+    }
+    updateAliveList();
+  }
+
+  public static String normalize(String server) {
+    if (server.endsWith("/"))
+      server = server.substring(0, server.length() - 1);
+    return server;
+  }
+
+  protected CommonsHttpSolrServer makeServer(String server) throws MalformedURLException {
+    return new CommonsHttpSolrServer(server, httpClient, binaryParser);
+  }
+
+
+
+  /**
+   * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
+   * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
+   * time, or until a test request on that server succeeds.
+   *
+   * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
+   * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
+   * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
+   *
+   * If no live servers are found a SolrServerException is thrown.
+   *
+   * @param req contains both the request as well as the list of servers to query
+   *
+   * @return the result of the request
+   *
+   * @throws SolrServerException
+   * @throws IOException
+   */
+  public Rsp request(Req req) throws SolrServerException, IOException {
+    Rsp rsp = new Rsp();
+    Exception ex = null;
+
+    List<ServerWrapper> skipped = new ArrayList<ServerWrapper>(req.getNumDeadServersToTry());
+
+    for (String serverStr : req.getServers()) {
+      serverStr = normalize(serverStr);
+      // if the server is currently a zombie, just skip to the next one
+      ServerWrapper wrapper = zombieServers.get(serverStr);
+      if (wrapper != null) {
+        // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+        if (skipped.size() < req.getNumDeadServersToTry())
+          skipped.add(wrapper);
+        continue;
+      }
+      rsp.server = serverStr;
+      CommonsHttpSolrServer server = makeServer(serverStr);
+
+      try {
+        rsp.rsp = server.request(req.getRequest());
+        return rsp; // SUCCESS
+      } catch (SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          wrapper = new ServerWrapper(server);
+          wrapper.lastUsed = System.currentTimeMillis();
+          wrapper.standard = false;
+          zombieServers.put(wrapper.getKey(), wrapper);
+          startAliveCheckExecutor();
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+    // try the servers we previously skipped
+    for (ServerWrapper wrapper : skipped) {
+      try {
+        rsp.rsp = wrapper.solrServer.request(req.getRequest());
+        zombieServers.remove(wrapper.getKey());
+        return rsp; // SUCCESS
+      } catch (SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        zombieServers.remove(wrapper.getKey());
+        throw e;
+      } catch (SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          // already a zombie, no need to re-add
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new SolrServerException(e);
+      }
+    }
+
+
+    if (ex == null) {
+      throw new SolrServerException("No live SolrServers available to handle this request");
+    } else {
+      throw new SolrServerException("No live SolrServers available to handle this request", ex);
+    }
+
+  }  
+
+
+
+  private void updateAliveList() {
+    synchronized (aliveServers) {
+      aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
+    }
+  }
+
+  private ServerWrapper removeFromAlive(String key) {
+    synchronized (aliveServers) {
+      ServerWrapper wrapper = aliveServers.remove(key);
+      if (wrapper != null)
+        updateAliveList();
+      return wrapper;
+    }
+  }
+
+  private void addToAlive(ServerWrapper wrapper) {
+    synchronized (aliveServers) {
+      ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
+      // TODO: warn if there was a previous entry?
+      updateAliveList();
     }
   }
 
   public void addSolrServer(String server) throws MalformedURLException {
     CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(server, httpClient);
-    checkLock.lock();
-    try {
-      aliveServers.add(new ServerWrapper(solrServer));
-    } finally {
-      checkLock.unlock();
-    }
+    addToAlive(new ServerWrapper(solrServer));
   }
 
   public String removeSolrServer(String server) {
@@ -132,25 +332,11 @@
     if (server.endsWith("/")) {
       server = server.substring(0, server.length() - 1);
     }
-    this.checkLock.lock();
-    try {
-      for (ServerWrapper serverWrapper : aliveServers) {
-        if (serverWrapper.solrServer.getBaseURL().equals(server)) {
-          aliveServers.remove(serverWrapper);
-          return serverWrapper.solrServer.getBaseURL();
-        }
-      }
-      if (zombieServers.isEmpty()) return null;
 
-      for (ServerWrapper serverWrapper : zombieServers) {
-        if (serverWrapper.solrServer.getBaseURL().equals(server)) {
-          zombieServers.remove(serverWrapper);
-          return serverWrapper.solrServer.getBaseURL();
-        }
-      }
-    } finally {
-      checkLock.unlock();
-    }
+    // there is a small race condition here - if the server is in the process of being moved between
+    // lists, we could fail to remove it.
+    removeFromAlive(server);
+    zombieServers.remove(server);
     return null;
   }
 
@@ -174,9 +360,10 @@
   }
 
   /**
-   * Tries to query a live server. If no live servers are found it throws a SolrServerException. If the request failed
-   * due to IOException then the live server is moved to dead pool and the request is retried on another live server if
-   * available. If all live servers are exhausted then a SolrServerException is thrown.
+   * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
+   * If the request failed due to IOException then the live server is moved to dead pool and the request is
+   * retried on another live server.  After live servers are exhausted, any servers previously marked as dead
+   * will be tried before failing the request.
    *
    * @param request the SolrRequest.
    *
@@ -187,41 +374,69 @@
    */
   public NamedList<Object> request(final SolrRequest request)
           throws SolrServerException, IOException {
-    int count = counter.incrementAndGet();
-    int attempts = 0;
-    Exception ex;
-    int startSize = aliveServers.size();
-    while (true) {
-      int size = aliveServers.size();
-      if (size < 1) throw new SolrServerException("No live SolrServers available to handle this request");
-      ServerWrapper solrServer;
+    Exception ex = null;
+    ServerWrapper[] serverList = aliveServerList;
+    
+    int maxTries = serverList.length;
+    Map<String,ServerWrapper> justFailed = null;
+
+    for (int attempts=0; attempts<maxTries; attempts++) {
+      int count = counter.incrementAndGet();      
+      ServerWrapper wrapper = serverList[count % serverList.length];
+      wrapper.lastUsed = System.currentTimeMillis();
+
       try {
-        solrServer = aliveServers.get(count % size);
-      } catch (IndexOutOfBoundsException e) {
-        //this list changes dynamically. so it is expected to get IndexOutOfBoundsException
-        continue;
+        return wrapper.solrServer.request(request);
+      } catch (SolrException e) {
+        // Server is alive but the request was malformed or invalid
+        throw e;
+      } catch (SolrServerException e) {
+        if (e.getRootCause() instanceof IOException) {
+          ex = e;
+          moveAliveToDead(wrapper);
+          if (justFailed == null) justFailed = new HashMap<String,ServerWrapper>();
+          justFailed.put(wrapper.getKey(), wrapper);
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new SolrServerException(e);
       }
+    }
+
+
+    // try other standard servers that we didn't try just now
+    for (ServerWrapper wrapper : zombieServers.values()) {
+      if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
       try {
-        return solrServer.solrServer.request(request);
+        NamedList<Object> rsp = wrapper.solrServer.request(request);
+        // remove from zombie list *before* adding to alive to avoid a race that could lose a server
+        zombieServers.remove(wrapper.getKey());
+        addToAlive(wrapper);
+        return rsp;
       } catch (SolrException e) {
         // Server is alive but the request was malformed or invalid
         throw e;
       } catch (SolrServerException e) {
         if (e.getRootCause() instanceof IOException) {
           ex = e;
-          moveAliveToDead(solrServer);
+          // still dead
         } else {
           throw e;
         }
       } catch (Exception e) {
         throw new SolrServerException(e);
       }
-      attempts++;
-      if (attempts >= startSize)
-        throw new SolrServerException("No live SolrServers available to handle this request", ex);
     }
-  }
 
+
+    if (ex == null) {
+      throw new SolrServerException("No live SolrServers available to handle this request");
+    } else {
+      throw new SolrServerException("No live SolrServers available to handle this request", ex);
+    }
+  }
+  
   /**
    * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
    * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
@@ -230,39 +445,44 @@
    */
   private void checkAZombieServer(ServerWrapper zombieServer) {
     long currTime = System.currentTimeMillis();
-    checkLock.lock();
     try {
       zombieServer.lastChecked = currTime;
       QueryResponse resp = zombieServer.solrServer.query(solrQuery);
       if (resp.getStatus() == 0) {
-        //server has come back up
-        zombieServer.lastUsed = currTime;
-        zombieServers.remove(zombieServer);
-        aliveServers.add(zombieServer);
-        zombieServer.failedPings = 0;
+        // server has come back up.
+        // make sure to remove from zombies before adding to alive to avoid a race condition
+        // where another thread could mark it down, move it back to zombie, an then we delete
+        // from zombie and lose it forever.
+        ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
+        if (wrapper != null) {
+          wrapper.failedPings = 0;
+          if (wrapper.standard) {
+            addToAlive(wrapper);
+          }
+        } else {
+          // something else already moved the server from zombie to alive
+        }
       }
     } catch (Exception e) {
+      //Expected. The server is still down.
       zombieServer.failedPings++;
-      //Expected . The server is still down
-    } finally {
-      checkLock.unlock();
-    }
-  }
 
-  private void moveAliveToDead(ServerWrapper solrServer) {
-    checkLock.lock();
-    try {
-      boolean result = aliveServers.remove(solrServer);
-      if (result) {
-        if (zombieServers.addIfAbsent(solrServer)) {
-          startAliveCheckExecutor();
-        }
+      // If the server doesn't belong in the standard set belonging to this load balancer
+      // then simply drop it after a certain number of failed pings.
+      if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
+        zombieServers.remove(zombieServer.getKey());
       }
-    } finally {
-      checkLock.unlock();
     }
   }
 
+  private void moveAliveToDead(ServerWrapper wrapper) {
+    wrapper = removeFromAlive(wrapper.getKey());
+    if (wrapper == null)
+      return;  // another thread already detected the failure and removed it
+    zombieServers.put(wrapper.getKey(), wrapper);
+    startAliveCheckExecutor();
+  }
+
   private int interval = CHECK_INTERVAL;
 
   /**
@@ -280,6 +500,8 @@
   }
 
   private void startAliveCheckExecutor() {
+    // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
+    // if it's not null.
     if (aliveCheckExecutor == null) {
       synchronized (this) {
         if (aliveCheckExecutor == null) {
@@ -292,13 +514,13 @@
     }
   }
 
-  private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbHttpSolrServer) {
+  private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbRef) {
     return new Runnable() {
       public void run() {
-        LBHttpSolrServer solrServer = lbHttpSolrServer.get();
-        if (solrServer != null && solrServer.zombieServers != null) {
-          for (ServerWrapper zombieServer : solrServer.zombieServers) {
-            solrServer.checkAZombieServer(zombieServer);
+        LBHttpSolrServer lb = lbRef.get();
+        if (lb != null && lb.zombieServers != null) {
+          for (ServerWrapper zombieServer : lb.zombieServers.values()) {
+            lb.checkAZombieServer(zombieServer);
           }
         }
       }
@@ -318,5 +540,7 @@
     }
   }
 
+  // defaults
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
+  private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
 }

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java Tue Jan 12 01:20:54 2010
@@ -2,15 +2,7 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 
 import junit.framework.TestCase;
 
@@ -50,6 +42,8 @@
   protected List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
   protected String context = "/solr";
   protected String shards;
+  protected String[] shardsArr;
+  protected String[] deadServers = {"localhost:33331/solr","localhost:33332/solr"};
   protected File testDir;
   protected SolrServer controlClient;
   protected int portSeed;
@@ -150,18 +144,43 @@
     controlJetty = createJetty(testDir, "control");
     controlClient = createNewSolrServer(controlJetty.getLocalPort());
 
+    shardsArr = new String[numShards];
     StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= numShards; i++) {
+    for (int i = 0; i < numShards; i++) {
       if (sb.length() > 0) sb.append(',');
       JettySolrRunner j = createJetty(testDir, "shard" + i);
       jettys.add(j);
       clients.add(createNewSolrServer(j.getLocalPort()));
-      sb.append("localhost:").append(j.getLocalPort()).append(context);
+      String shardStr = "localhost:" + j.getLocalPort() + context;
+      shardsArr[i] = shardStr;
+      sb.append(shardStr);
     }
 
     shards = sb.toString();
   }
 
+  protected String getShardsString() {
+    if (deadServers == null) return shards;
+    
+    StringBuilder sb = new StringBuilder();
+    for (String shard : shardsArr) {
+      if (sb.length() > 0) sb.append(',');
+      int nDeadServers = r.nextInt(deadServers.length+1);
+      if (nDeadServers > 0) {
+        List<String> replicas = new ArrayList<String>(Arrays.asList(deadServers));
+        Collections.shuffle(replicas, r);
+        replicas.add(r.nextInt(nDeadServers+1), shard);
+        for (int i=0; i<nDeadServers+1; i++) {
+          if (i!=0) sb.append('|');
+          sb.append(replicas.get(i));
+        }
+      } else {
+        sb.append(shard);
+      }
+    }
+    return sb.toString();
+  }
+
   protected void destroyServers() throws Exception {
     controlJetty.stop();
     for (JettySolrRunner jetty : jettys) jetty.stop();
@@ -276,7 +295,7 @@
     final QueryResponse controlRsp = controlClient.query(params);
 
     // query a random server
-    params.set("shards", shards);
+    params.set("shards", getShardsString());
     int which = r.nextInt(clients.size());
     SolrServer client = clients.get(which);
     QueryResponse rsp = client.query(params);

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java Tue Jan 12 01:20:54 2010
@@ -20,21 +20,21 @@
 import junit.framework.TestCase;
 import junit.framework.Assert;
 import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.util.AbstractSolrTestCase;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 /**
  * Test for LBHttpSolrServer
@@ -44,7 +44,7 @@
  */
 public class TestLBHttpSolrServer extends TestCase {
   SolrInstance[] solr = new SolrInstance[3];
-  HttpClient httpClient = new HttpClient();
+  HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager());
 
   public void setUp() throws Exception {
     for (int i = 0; i < solr.length; i++) {
@@ -78,48 +78,168 @@
   }
 
   public void testSimple() throws Exception {
-    String[] s = new String[solr.length];
+    LinkedList<String> serverList = new LinkedList<String>();
     for (int i = 0; i < solr.length; i++) {
-      s[i] = solr[i].getUrl();
+      serverList.add(solr[i].getUrl());
     }
-    LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, s);
-    lbHttpSolrServer.setAliveCheckInterval(500);
+    String[] servers = serverList.toArray(new String[serverList.size()]);
+
+    LBHttpSolrServer lb = new LBHttpSolrServer(httpClient, servers);
+    lb.setAliveCheckInterval(500);
+    LBHttpSolrServer lb2 = new LBHttpSolrServer(httpClient);
+    lb2.setAliveCheckInterval(500);
+
+
     SolrQuery solrQuery = new SolrQuery("*:*");
+    SolrRequest solrRequest = new QueryRequest(solrQuery);
     Set<String> names = new HashSet<String>();
     QueryResponse resp = null;
-    for (String value : s) {
-      resp = lbHttpSolrServer.query(solrQuery);
+    for (String server : servers) {
+      resp = lb.query(solrQuery);
+      assertEquals(10, resp.getResults().getNumFound());
+      names.add(resp.getResults().get(0).getFieldValue("name").toString());
+    }
+    assertEquals(3, names.size());
+
+    // Now test through the advanced API
+    names.clear();
+    for (String server : servers) {
+      LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+      // make sure the response came from the first in the list
+      assertEquals(rsp.getServer(), serverList.getFirst());
+      resp = new QueryResponse(rsp.getResponse(), lb);
       assertEquals(10, resp.getResults().getNumFound());
       names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+      // rotate the server list
+      serverList.addLast(serverList.removeFirst());
     }
     assertEquals(3, names.size());
 
+
     // Kill a server and test again
     solr[1].jetty.stop();
     solr[1].jetty = null;
     names.clear();
-    for (String value : s) {
-      resp = lbHttpSolrServer.query(solrQuery);
+    for (String server : servers) {
+      resp = lb.query(solrQuery);
       assertEquals(10, resp.getResults().getNumFound());
       names.add(resp.getResults().get(0).getFieldValue("name").toString());
     }
     assertEquals(2, names.size());
     assertFalse(names.contains("solr1"));
 
+
+    // Now test through the advanced API    
+    names.clear();
+    for (String server : servers) {
+      LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+      resp = new QueryResponse(rsp.getResponse(), lb);
+      assertFalse(rsp.getServer().contains("solr1"));      
+      assertEquals(10, resp.getResults().getNumFound());
+      names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+      // rotate the server list
+      serverList.addLast(serverList.removeFirst());
+    }
+    assertEquals(2, names.size());
+    assertFalse(names.contains("solr1"));
+
     // Start the killed server once again
-    solr[1].startJetty();
+    solr[1].startJetty(true);
     // Wait for the alive check to complete
-    Thread.sleep(1200);
+    Thread.sleep(600);
+    names.clear();
+    for (String value : servers) {
+      resp = lb.query(solrQuery);
+      assertEquals(10, resp.getResults().getNumFound());
+      names.add(resp.getResults().get(0).getFieldValue("name").toString());
+    }
+    // System.out.println("SERVERNAMES="+names);
+    assertEquals(3, names.size());
+
+    // Now test through the advanced API
     names.clear();
-    for (String value : s) {
-      resp = lbHttpSolrServer.query(solrQuery);
+    for (String server : servers) {
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+      LBHttpSolrServer.Rsp rsp = lb2.request(req);
+      // make sure the response came from the first in the list
+      assertEquals(rsp.getServer(), serverList.getFirst());
+      resp = new QueryResponse(rsp.getResponse(), lb);
       assertEquals(10, resp.getResults().getNumFound());
       names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+      // rotate the server list
+      serverList.addLast(serverList.removeFirst());
     }
     assertEquals(3, names.size());
+
+    
+    // slow LB for Simple API
+    LBHttpSolrServer slowLB = new LBHttpSolrServer(httpClient, servers);
+    slowLB.setAliveCheckInterval(1000000000);
+
+    // slow LB for advanced API
+    LBHttpSolrServer slowLB2 = new LBHttpSolrServer(httpClient);
+    slowLB2.setAliveCheckInterval(1000000000);
+
+    // stop all solr servers
+    for (SolrInstance solrInstance : solr) {
+      solrInstance.jetty.stop();
+      solrInstance.jetty = null;
+    }
+
+    try {
+      resp = slowLB.query(solrQuery);
+      TestCase.fail(); // all servers should be down
+    } catch (SolrServerException e) {
+      // expected      
+    }
+
+    try {
+      LBHttpSolrServer.Rsp rsp = slowLB2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+      TestCase.fail(); // all servers should be down
+    } catch (SolrServerException e) {
+      // expected
+    }
+
+    // Start the killed server once again
+    solr[1].startJetty(true);
+
+    // even though one of the servers is now up, the loadbalancer won't yet know this unless we ask
+    // it to try dead servers.
+    try {
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+      req.setNumDeadServersToTry(0);
+      LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+      TestCase.fail(); // all servers still should be marked as down
+    } catch (SolrServerException e) {
+      // expected
+    }
+
+    // the default is to try dead servers if there are no live servers
+    {
+      resp = slowLB.query(solrQuery);
+    }
+
+    // the default is to try dead servers if there are no live servers
+    {
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+      LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+    }
+
+    // the last success should have removed the server from the dead server list, so
+    // the next request should succeed even if it doesn't try dead servers.
+    {
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+      req.setNumDeadServersToTry(0);
+      LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+    }
+
   }
 
-  public void testTwoServers() throws Exception {
+  // this test is a subset of testSimple and is no longer needed
+  public void XtestTwoServers() throws Exception {
     LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, solr[0].getUrl(), solr[1].getUrl());
     lbHttpSolrServer.setAliveCheckInterval(500);
     SolrQuery solrQuery = new SolrQuery("*:*");
@@ -135,15 +255,9 @@
     Assert.assertEquals("solr1", name);
     solr[1].jetty.stop();
     solr[1].jetty = null;
-    solr[0].startJetty();
-    Thread.sleep(1200);
-    try {
-      resp = lbHttpSolrServer.query(solrQuery);
-    } catch(SolrServerException e) {
-      // try again after a pause in case the error is lack of time to start server
-      Thread.sleep(3000);
-      resp = lbHttpSolrServer.query(solrQuery);
-    }
+    solr[0].startJetty(true);
+    Thread.sleep(600);
+    resp = lbHttpSolrServer.query(solrQuery);
     name = resp.getResults().get(0).getFieldValue("name").toString();
     Assert.assertEquals("solr0", name);
   }
@@ -217,10 +331,15 @@
     }
 
     public void startJetty() throws Exception {
+      startJetty(false);
+    }
+
+
+    public void startJetty(boolean waitUntilUp) throws Exception {
       jetty = new JettySolrRunner("/solr", port);
       System.setProperty("solr.solr.home", getHomeDir());
       System.setProperty("solr.data.dir", getDataDir());
-      jetty.start();
+      jetty.start(waitUntilUp);
       int newPort = jetty.getLocalPort();
       if (port != 0 && newPort != port) {
         TestCase.fail("TESTING FAILURE: could not grab requested port.");

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Jan 12 01:20:54 2010
@@ -36,6 +36,9 @@
     // we don't call super.setUp
     log.info("####SETUP_START " + getName());
     portSeed = 13000;
+
+    // TODO: HACK: inserting dead servers doesn't currently work with these tests
+    deadServers = null;
     
     System.setProperty("zkHost", AbstractZkTestCase.ZOO_KEEPER_ADDRESS);
     String zkDir = tmpDir.getAbsolutePath() + File.separator