You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2010/01/12 02:20:55 UTC
svn commit: r898144 - in /lucene/solr/branches/cloud/src:
java/org/apache/solr/handler/component/
solrj/org/apache/solr/client/solrj/impl/ test/org/apache/solr/
test/org/apache/solr/client/solrj/ test/org/apache/solr/cloud/
Author: yonik
Date: Tue Jan 12 01:20:54 2010
New Revision: 898144
URL: http://svn.apache.org/viewvc?rev=898144&view=rev
Log:
SOLR-1698: load balanced distrib search
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java
lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java
lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/SearchHandler.java Tue Jan 12 01:20:54 2010
@@ -21,6 +21,7 @@
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RTimer;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -33,6 +34,7 @@
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.solr.core.SolrCore;
import org.apache.lucene.queryParser.ParseException;
@@ -43,6 +45,7 @@
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
+import java.net.MalformedURLException;
/**
*
@@ -353,6 +356,8 @@
static HttpClient client;
+ static Random r = new Random();
+ static LBHttpSolrServer loadbalancer;
static {
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
@@ -361,12 +366,23 @@
mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
// mgr.getParams().setStaleCheckingEnabled(false);
- client = new HttpClient(mgr);
+ client = new HttpClient(mgr);
+ try {
+ loadbalancer = new LBHttpSolrServer(client);
+ } catch (MalformedURLException e) {
+ // should be impossible since we're not passing any URLs here
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
+ }
}
CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
+ // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+ // This is primarily to keep track of what order we should use to query the replicas of a shard
+ // so that we use the same replica for all phases of a distributed request.
+ Map<String,List<String>> shardToURLs = new HashMap<String,List<String>>();
+
HttpCommComponent() {
}
@@ -390,7 +406,36 @@
}
}
+
+ // Not thread safe... don't use in Callable.
+ // Don't modify the returned URL list.
+ private List<String> getURLs(String shard) {
+ List<String> urls = shardToURLs.get(shard);
+ if (urls==null) {
+ urls = StrUtils.splitSmart(shard,"|",true);
+
+ // convert shard to URL
+ for (int i=0; i<urls.size(); i++) {
+ urls.set(i, "http://"+urls.get(i));
+ }
+
+ //
+ // Shuffle the list instead of use round-robin by default.
+ // This prevents accidental synchronization where multiple shards could get in sync
+ // and query the same replica at the same time.
+ //
+ if (urls.size() > 1)
+ Collections.shuffle(urls, r);
+ shardToURLs.put(shard, urls);
+ }
+ return urls;
+ }
+
+
void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+ // do this outside of the callable for thread safety reasons
+ final List<String> urls = getURLs(shard);
+
Callable<ShardResponse> task = new Callable<ShardResponse>() {
public ShardResponse call() throws Exception {
@@ -402,13 +447,9 @@
long startTime = System.currentTimeMillis();
try {
- // String url = "http://" + shard + "/select";
- String url = "http://" + shard;
-
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
- SolrServer server = new CommonsHttpSolrServer(url, client);
// SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
@@ -416,10 +457,17 @@
// no need to set the response parser as binary is the default
// req.setResponseParser(new BinaryResponseParser());
- // srsp.rsp = server.request(req);
- // srsp.rsp = server.query(sreq.params);
- ssr.nl = server.request(req);
+ if (urls.size() <= 1) {
+ String url = urls.get(0);
+ srsp.setShardAddress(url);
+ SolrServer server = new CommonsHttpSolrServer(url, client);
+ ssr.nl = server.request(req);
+ } else {
+ LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ }
} catch (Throwable th) {
srsp.setException(th);
if (th instanceof SolrException) {
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/ShardResponse.java Tue Jan 12 01:20:54 2010
@@ -20,63 +20,68 @@
import org.apache.solr.common.SolrException;
public final class ShardResponse {
- private ShardRequest req;
- private String shard;
- private String shardAddress; // the specific shard that this response was received from
- private int rspCode;
- private Throwable exception;
- private SolrResponse rsp;
-
- public String toString() {
- return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress
- +"\n\trequest=" + req
- +"\n\tresponse=" + rsp
- + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception))
- +"\n}";
- }
-
- public Throwable getException()
- {
- return exception;
- }
-
- public ShardRequest getShardRequest()
- {
- return req;
- }
-
- public SolrResponse getSolrResponse()
- {
- return rsp;
- }
-
- public String getShard()
- {
- return shard;
- }
-
- void setShardRequest(ShardRequest rsp)
- {
- this.req = rsp;
- }
-
- void setSolrResponse(SolrResponse rsp)
- {
- this.rsp = rsp;
- }
-
- void setShard(String shard)
- {
- this.shard = shard;
- }
-
- void setException(Throwable exception)
- {
- this.exception = exception;
- }
-
- void setResponseCode(int rspCode)
- {
- this.rspCode = rspCode;
- }
+ private ShardRequest req;
+ private String shard;
+ private String shardAddress; // the specific shard that this response was received from
+ private int rspCode;
+ private Throwable exception;
+ private SolrResponse rsp;
+
+ public String toString() {
+ return "ShardResponse:{shard="+shard+",shardAddress="+shardAddress
+ +"\n\trequest=" + req
+ +"\n\tresponse=" + rsp
+ + (exception==null ? "" : "\n\texception="+ SolrException.toStr(exception))
+ +"\n}";
+ }
+
+ public Throwable getException()
+ {
+ return exception;
+ }
+
+ public ShardRequest getShardRequest()
+ {
+ return req;
+ }
+
+ public SolrResponse getSolrResponse()
+ {
+ return rsp;
+ }
+
+ public String getShard()
+ {
+ return shard;
+ }
+
+ void setShardRequest(ShardRequest rsp)
+ {
+ this.req = rsp;
+ }
+
+ void setSolrResponse(SolrResponse rsp)
+ {
+ this.rsp = rsp;
+ }
+
+ void setShard(String shard)
+ {
+ this.shard = shard;
+ }
+
+ void setException(Throwable exception)
+ {
+ this.exception = exception;
+ }
+
+ void setResponseCode(int rspCode)
+ {
+ this.rspCode = rspCode;
+ }
+
+ /** What was the shard address that returned this response. Example: "http://localhost:8983/solr" */
+ public String getShardAddress() { return this.shardAddress; }
+
+ void setShardAddress(String addr) { this.shardAddress = addr; }
}
Modified: lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Tue Jan 12 01:20:54 2010
@@ -27,26 +27,24 @@
import java.lang.ref.WeakReference;
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.*;
/**
- * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is just a wrapper to CommonsHttpSolrServer. This is useful when you
+ * LBHttpSolrServer or "LoadBalanced HttpSolrServer" is a load balancing wrapper to CommonsHttpSolrServer. This is useful when you
* have multiple SolrServers and the requests need to be Load Balanced among them. This should <b>NOT</b> be used for
* indexing. Also see the <a href="http://wiki.apache.org/solr/LBHttpSolrServer">wiki</a> page.
* <p/>
* It offers automatic failover when a server goes down and it detects when the server comes back up.
* <p/>
- * Load balancing is done using a simple roundrobin on the list of servers.
+ * Load balancing is done using a simple round-robin on the list of servers.
* <p/>
* If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
* off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
* This process is continued till it tries all the live servers. If atleast one server is alive, the request succeeds,
- * andif not it fails.
+ * and if not it fails.
* <blockquote><pre>
* SolrServer lbHttpSolrServer = new LBHttpSolrServer("http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
* //or if you wish to pass the HttpClient do as follows
@@ -57,23 +55,33 @@
* This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
* <p/>
* <b>When to use this?</b><br/> This can be used as a software load balancer when you do not wish to setup an external
- * load balancer. The code is relatively new and the API is currently experimental. Alternatives to this code are to use
+ * load balancer. Alternatives to this code are to use
* a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
* href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
*
- * @version $Id$
* @since solr 1.4
*/
public class LBHttpSolrServer extends SolrServer {
- private final CopyOnWriteArrayList<ServerWrapper> aliveServers = new CopyOnWriteArrayList<ServerWrapper>();
- private final CopyOnWriteArrayList<ServerWrapper> zombieServers = new CopyOnWriteArrayList<ServerWrapper>();
+
+
+ // keys to the maps are currently of the form "http://localhost:8983/solr"
+ // which should be equivalent to CommonsHttpSolrServer.getBaseURL()
+ private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<String, ServerWrapper>();
+ // access to aliveServers should be synchronized on itself
+
+ private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<String, ServerWrapper>();
+
+ // changes to aliveServers are reflected in this array, no need to synchronize
+ private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+
+
private ScheduledExecutorService aliveCheckExecutor;
private HttpClient httpClient;
private final AtomicInteger counter = new AtomicInteger(-1);
- private ReentrantLock checkLock = new ReentrantLock();
private static final SolrQuery solrQuery = new SolrQuery("*:*");
+ private static final BinaryResponseParser binaryParser = new BinaryResponseParser();
static {
solrQuery.setRows(0);
@@ -82,8 +90,13 @@
private static class ServerWrapper {
final CommonsHttpSolrServer solrServer;
- // Used only by the thread in aliveCheckExecutor
- long lastUsed, lastChecked;
+ long lastUsed; // last time used for a real request
+ long lastChecked; // last time checked for liveness
+
+ // "standard" servers are used by default. They normally live in the alive list
+ // and move to the zombie list when unavailable. Wne they become available again,
+ // they move back to the alive list.
+ boolean standard = true;
int failedPings = 0;
@@ -94,33 +107,220 @@
public String toString() {
return solrServer.getBaseURL();
}
+
+ public String getKey() {
+ return solrServer.getBaseURL();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getKey().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof ServerWrapper)) return false;
+ return this.getKey().equals(((ServerWrapper)obj).getKey());
+ }
+ }
+
+ public static class Req {
+ protected SolrRequest request;
+ protected List<String> servers;
+ protected int numDeadServersToTry;
+
+ public Req(SolrRequest request, List<String> servers) {
+ this.request = request;
+ this.servers = servers;
+ this.numDeadServersToTry = servers.size();
+ }
+
+ public SolrRequest getRequest() {
+ return request;
+ }
+ public List<String> getServers() {
+ return servers;
+ }
+
+ /** @return the number of dead servers to try if there are no liver servers left */
+ public int getNumDeadServersToTry() {
+ return numDeadServersToTry;
+ }
+
+ /** @return The number of dead servers to try if there are no liver servers left.
+ * Defaults to the number of servers in this request. */
+ public void setNumDeadServersToTry(int numDeadServersToTry) {
+ this.numDeadServersToTry = numDeadServersToTry;
+ }
+ }
+
+ public static class Rsp {
+ protected String server;
+ protected NamedList<Object> rsp;
+
+ /** The response from the server */
+ public NamedList<Object> getResponse() {
+ return rsp;
+ }
+
+ /** The server that returned the response */
+ public String getServer() {
+ return server;
+ }
}
public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
this(new HttpClient(new MultiThreadedHttpConnectionManager()), solrServerUrls);
}
+ /** The provided httpClient should use a multi-threaded connection manager */
public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
throws MalformedURLException {
this(httpClient, new BinaryResponseParser(), solrServerUrl);
}
+ /** The provided httpClient should use a multi-threaded connection manager */
public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
throws MalformedURLException {
this.httpClient = httpClient;
for (String s : solrServerUrl) {
- aliveServers.add(new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser)));
+ ServerWrapper wrapper = new ServerWrapper(new CommonsHttpSolrServer(s, httpClient, parser));
+ aliveServers.put(wrapper.getKey(), wrapper);
+ }
+ updateAliveList();
+ }
+
+ public static String normalize(String server) {
+ if (server.endsWith("/"))
+ server = server.substring(0, server.length() - 1);
+ return server;
+ }
+
+ protected CommonsHttpSolrServer makeServer(String server) throws MalformedURLException {
+ return new CommonsHttpSolrServer(server, httpClient, binaryParser);
+ }
+
+
+
+ /**
+ * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
+ * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
+ * time, or until a test request on that server succeeds.
+ *
+ * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
+ * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
+ * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
+ *
+ * If no live servers are found a SolrServerException is thrown.
+ *
+ * @param req contains both the request as well as the list of servers to query
+ *
+ * @return the result of the request
+ *
+ * @throws SolrServerException
+ * @throws IOException
+ */
+ public Rsp request(Req req) throws SolrServerException, IOException {
+ Rsp rsp = new Rsp();
+ Exception ex = null;
+
+ List<ServerWrapper> skipped = new ArrayList<ServerWrapper>(req.getNumDeadServersToTry());
+
+ for (String serverStr : req.getServers()) {
+ serverStr = normalize(serverStr);
+ // if the server is currently a zombie, just skip to the next one
+ ServerWrapper wrapper = zombieServers.get(serverStr);
+ if (wrapper != null) {
+ // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+ if (skipped.size() < req.getNumDeadServersToTry())
+ skipped.add(wrapper);
+ continue;
+ }
+ rsp.server = serverStr;
+ CommonsHttpSolrServer server = makeServer(serverStr);
+
+ try {
+ rsp.rsp = server.request(req.getRequest());
+ return rsp; // SUCCESS
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ wrapper = new ServerWrapper(server);
+ wrapper.lastUsed = System.currentTimeMillis();
+ wrapper.standard = false;
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+ // try the servers we previously skipped
+ for (ServerWrapper wrapper : skipped) {
+ try {
+ rsp.rsp = wrapper.solrServer.request(req.getRequest());
+ zombieServers.remove(wrapper.getKey());
+ return rsp; // SUCCESS
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ zombieServers.remove(wrapper.getKey());
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ // already a zombie, no need to re-add
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+
+ if (ex == null) {
+ throw new SolrServerException("No live SolrServers available to handle this request");
+ } else {
+ throw new SolrServerException("No live SolrServers available to handle this request", ex);
+ }
+
+ }
+
+
+
+ private void updateAliveList() {
+ synchronized (aliveServers) {
+ aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
+ }
+ }
+
+ private ServerWrapper removeFromAlive(String key) {
+ synchronized (aliveServers) {
+ ServerWrapper wrapper = aliveServers.remove(key);
+ if (wrapper != null)
+ updateAliveList();
+ return wrapper;
+ }
+ }
+
+ private void addToAlive(ServerWrapper wrapper) {
+ synchronized (aliveServers) {
+ ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
+ // TODO: warn if there was a previous entry?
+ updateAliveList();
}
}
public void addSolrServer(String server) throws MalformedURLException {
CommonsHttpSolrServer solrServer = new CommonsHttpSolrServer(server, httpClient);
- checkLock.lock();
- try {
- aliveServers.add(new ServerWrapper(solrServer));
- } finally {
- checkLock.unlock();
- }
+ addToAlive(new ServerWrapper(solrServer));
}
public String removeSolrServer(String server) {
@@ -132,25 +332,11 @@
if (server.endsWith("/")) {
server = server.substring(0, server.length() - 1);
}
- this.checkLock.lock();
- try {
- for (ServerWrapper serverWrapper : aliveServers) {
- if (serverWrapper.solrServer.getBaseURL().equals(server)) {
- aliveServers.remove(serverWrapper);
- return serverWrapper.solrServer.getBaseURL();
- }
- }
- if (zombieServers.isEmpty()) return null;
- for (ServerWrapper serverWrapper : zombieServers) {
- if (serverWrapper.solrServer.getBaseURL().equals(server)) {
- zombieServers.remove(serverWrapper);
- return serverWrapper.solrServer.getBaseURL();
- }
- }
- } finally {
- checkLock.unlock();
- }
+ // there is a small race condition here - if the server is in the process of being moved between
+ // lists, we could fail to remove it.
+ removeFromAlive(server);
+ zombieServers.remove(server);
return null;
}
@@ -174,9 +360,10 @@
}
/**
- * Tries to query a live server. If no live servers are found it throws a SolrServerException. If the request failed
- * due to IOException then the live server is moved to dead pool and the request is retried on another live server if
- * available. If all live servers are exhausted then a SolrServerException is thrown.
+ * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
+ * If the request failed due to IOException then the live server is moved to dead pool and the request is
+ * retried on another live server. After live servers are exhausted, any servers previously marked as dead
+ * will be tried before failing the request.
*
* @param request the SolrRequest.
*
@@ -187,41 +374,69 @@
*/
public NamedList<Object> request(final SolrRequest request)
throws SolrServerException, IOException {
- int count = counter.incrementAndGet();
- int attempts = 0;
- Exception ex;
- int startSize = aliveServers.size();
- while (true) {
- int size = aliveServers.size();
- if (size < 1) throw new SolrServerException("No live SolrServers available to handle this request");
- ServerWrapper solrServer;
+ Exception ex = null;
+ ServerWrapper[] serverList = aliveServerList;
+
+ int maxTries = serverList.length;
+ Map<String,ServerWrapper> justFailed = null;
+
+ for (int attempts=0; attempts<maxTries; attempts++) {
+ int count = counter.incrementAndGet();
+ ServerWrapper wrapper = serverList[count % serverList.length];
+ wrapper.lastUsed = System.currentTimeMillis();
+
try {
- solrServer = aliveServers.get(count % size);
- } catch (IndexOutOfBoundsException e) {
- //this list changes dynamically. so it is expected to get IndexOutOfBoundsException
- continue;
+ return wrapper.solrServer.request(request);
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ moveAliveToDead(wrapper);
+ if (justFailed == null) justFailed = new HashMap<String,ServerWrapper>();
+ justFailed.put(wrapper.getKey(), wrapper);
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
}
+ }
+
+
+ // try other standard servers that we didn't try just now
+ for (ServerWrapper wrapper : zombieServers.values()) {
+ if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
try {
- return solrServer.solrServer.request(request);
+ NamedList<Object> rsp = wrapper.solrServer.request(request);
+ // remove from zombie list *before* adding to alive to avoid a race that could lose a server
+ zombieServers.remove(wrapper.getKey());
+ addToAlive(wrapper);
+ return rsp;
} catch (SolrException e) {
// Server is alive but the request was malformed or invalid
throw e;
} catch (SolrServerException e) {
if (e.getRootCause() instanceof IOException) {
ex = e;
- moveAliveToDead(solrServer);
+ // still dead
} else {
throw e;
}
} catch (Exception e) {
throw new SolrServerException(e);
}
- attempts++;
- if (attempts >= startSize)
- throw new SolrServerException("No live SolrServers available to handle this request", ex);
}
- }
+
+ if (ex == null) {
+ throw new SolrServerException("No live SolrServers available to handle this request");
+ } else {
+ throw new SolrServerException("No live SolrServers available to handle this request", ex);
+ }
+ }
+
/**
* Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
* aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
@@ -230,39 +445,44 @@
*/
private void checkAZombieServer(ServerWrapper zombieServer) {
long currTime = System.currentTimeMillis();
- checkLock.lock();
try {
zombieServer.lastChecked = currTime;
QueryResponse resp = zombieServer.solrServer.query(solrQuery);
if (resp.getStatus() == 0) {
- //server has come back up
- zombieServer.lastUsed = currTime;
- zombieServers.remove(zombieServer);
- aliveServers.add(zombieServer);
- zombieServer.failedPings = 0;
+ // server has come back up.
+ // make sure to remove from zombies before adding to alive to avoid a race condition
+ // where another thread could mark it down, move it back to zombie, an then we delete
+ // from zombie and lose it forever.
+ ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
+ if (wrapper != null) {
+ wrapper.failedPings = 0;
+ if (wrapper.standard) {
+ addToAlive(wrapper);
+ }
+ } else {
+ // something else already moved the server from zombie to alive
+ }
}
} catch (Exception e) {
+ //Expected. The server is still down.
zombieServer.failedPings++;
- //Expected . The server is still down
- } finally {
- checkLock.unlock();
- }
- }
- private void moveAliveToDead(ServerWrapper solrServer) {
- checkLock.lock();
- try {
- boolean result = aliveServers.remove(solrServer);
- if (result) {
- if (zombieServers.addIfAbsent(solrServer)) {
- startAliveCheckExecutor();
- }
+ // If the server doesn't belong in the standard set belonging to this load balancer
+ // then simply drop it after a certain number of failed pings.
+ if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
+ zombieServers.remove(zombieServer.getKey());
}
- } finally {
- checkLock.unlock();
}
}
+ private void moveAliveToDead(ServerWrapper wrapper) {
+ wrapper = removeFromAlive(wrapper.getKey());
+ if (wrapper == null)
+ return; // another thread already detected the failure and removed it
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ }
+
private int interval = CHECK_INTERVAL;
/**
@@ -280,6 +500,8 @@
}
private void startAliveCheckExecutor() {
+ // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
+ // if it's not null.
if (aliveCheckExecutor == null) {
synchronized (this) {
if (aliveCheckExecutor == null) {
@@ -292,13 +514,13 @@
}
}
- private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbHttpSolrServer) {
+ private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbRef) {
return new Runnable() {
public void run() {
- LBHttpSolrServer solrServer = lbHttpSolrServer.get();
- if (solrServer != null && solrServer.zombieServers != null) {
- for (ServerWrapper zombieServer : solrServer.zombieServers) {
- solrServer.checkAZombieServer(zombieServer);
+ LBHttpSolrServer lb = lbRef.get();
+ if (lb != null && lb.zombieServers != null) {
+ for (ServerWrapper zombieServer : lb.zombieServers.values()) {
+ lb.checkAZombieServer(zombieServer);
}
}
}
@@ -318,5 +540,7 @@
}
}
+ // defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
+ private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/BaseDistributedSearchTestCase.java Tue Jan 12 01:20:54 2010
@@ -2,15 +2,7 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
import junit.framework.TestCase;
@@ -50,6 +42,8 @@
protected List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
protected String context = "/solr";
protected String shards;
+ protected String[] shardsArr;
+ protected String[] deadServers = {"localhost:33331/solr","localhost:33332/solr"};
protected File testDir;
protected SolrServer controlClient;
protected int portSeed;
@@ -150,18 +144,43 @@
controlJetty = createJetty(testDir, "control");
controlClient = createNewSolrServer(controlJetty.getLocalPort());
+ shardsArr = new String[numShards];
StringBuilder sb = new StringBuilder();
- for (int i = 1; i <= numShards; i++) {
+ for (int i = 0; i < numShards; i++) {
if (sb.length() > 0) sb.append(',');
JettySolrRunner j = createJetty(testDir, "shard" + i);
jettys.add(j);
clients.add(createNewSolrServer(j.getLocalPort()));
- sb.append("localhost:").append(j.getLocalPort()).append(context);
+ String shardStr = "localhost:" + j.getLocalPort() + context;
+ shardsArr[i] = shardStr;
+ sb.append(shardStr);
}
shards = sb.toString();
}
+ protected String getShardsString() {
+ if (deadServers == null) return shards;
+
+ StringBuilder sb = new StringBuilder();
+ for (String shard : shardsArr) {
+ if (sb.length() > 0) sb.append(',');
+ int nDeadServers = r.nextInt(deadServers.length+1);
+ if (nDeadServers > 0) {
+ List<String> replicas = new ArrayList<String>(Arrays.asList(deadServers));
+ Collections.shuffle(replicas, r);
+ replicas.add(r.nextInt(nDeadServers+1), shard);
+ for (int i=0; i<nDeadServers+1; i++) {
+ if (i!=0) sb.append('|');
+ sb.append(replicas.get(i));
+ }
+ } else {
+ sb.append(shard);
+ }
+ }
+ return sb.toString();
+ }
+
protected void destroyServers() throws Exception {
controlJetty.stop();
for (JettySolrRunner jetty : jettys) jetty.stop();
@@ -276,7 +295,7 @@
final QueryResponse controlRsp = controlClient.query(params);
// query a random server
- params.set("shards", shards);
+ params.set("shards", getShardsString());
int which = r.nextInt(clients.size());
SolrServer client = clients.get(which);
QueryResponse rsp = client.query(params);
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java Tue Jan 12 01:20:54 2010
@@ -20,21 +20,21 @@
import junit.framework.TestCase;
import junit.framework.Assert;
import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.util.AbstractSolrTestCase;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
/**
* Test for LBHttpSolrServer
@@ -44,7 +44,7 @@
*/
public class TestLBHttpSolrServer extends TestCase {
SolrInstance[] solr = new SolrInstance[3];
- HttpClient httpClient = new HttpClient();
+ HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager());
public void setUp() throws Exception {
for (int i = 0; i < solr.length; i++) {
@@ -78,48 +78,168 @@
}
public void testSimple() throws Exception {
- String[] s = new String[solr.length];
+ LinkedList<String> serverList = new LinkedList<String>();
for (int i = 0; i < solr.length; i++) {
- s[i] = solr[i].getUrl();
+ serverList.add(solr[i].getUrl());
}
- LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, s);
- lbHttpSolrServer.setAliveCheckInterval(500);
+ String[] servers = serverList.toArray(new String[serverList.size()]);
+
+ LBHttpSolrServer lb = new LBHttpSolrServer(httpClient, servers);
+ lb.setAliveCheckInterval(500);
+ LBHttpSolrServer lb2 = new LBHttpSolrServer(httpClient);
+ lb2.setAliveCheckInterval(500);
+
+
SolrQuery solrQuery = new SolrQuery("*:*");
+ SolrRequest solrRequest = new QueryRequest(solrQuery);
Set<String> names = new HashSet<String>();
QueryResponse resp = null;
- for (String value : s) {
- resp = lbHttpSolrServer.query(solrQuery);
+ for (String server : servers) {
+ resp = lb.query(solrQuery);
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ }
+ assertEquals(3, names.size());
+
+ // Now test through the advanced API
+ names.clear();
+ for (String server : servers) {
+ LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+ // make sure the response came from the first in the list
+ assertEquals(rsp.getServer(), serverList.getFirst());
+ resp = new QueryResponse(rsp.getResponse(), lb);
assertEquals(10, resp.getResults().getNumFound());
names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+ // rotate the server list
+ serverList.addLast(serverList.removeFirst());
}
assertEquals(3, names.size());
+
// Kill a server and test again
solr[1].jetty.stop();
solr[1].jetty = null;
names.clear();
- for (String value : s) {
- resp = lbHttpSolrServer.query(solrQuery);
+ for (String server : servers) {
+ resp = lb.query(solrQuery);
assertEquals(10, resp.getResults().getNumFound());
names.add(resp.getResults().get(0).getFieldValue("name").toString());
}
assertEquals(2, names.size());
assertFalse(names.contains("solr1"));
+
+ // Now test through the advanced API
+ names.clear();
+ for (String server : servers) {
+ LBHttpSolrServer.Rsp rsp = lb2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+ resp = new QueryResponse(rsp.getResponse(), lb);
+ assertFalse(rsp.getServer().contains("solr1"));
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+ // rotate the server list
+ serverList.addLast(serverList.removeFirst());
+ }
+ assertEquals(2, names.size());
+ assertFalse(names.contains("solr1"));
+
// Start the killed server once again
- solr[1].startJetty();
+ solr[1].startJetty(true);
// Wait for the alive check to complete
- Thread.sleep(1200);
+ Thread.sleep(600);
+ names.clear();
+ for (String value : servers) {
+ resp = lb.query(solrQuery);
+ assertEquals(10, resp.getResults().getNumFound());
+ names.add(resp.getResults().get(0).getFieldValue("name").toString());
+ }
+ // System.out.println("SERVERNAMES="+names);
+ assertEquals(3, names.size());
+
+ // Now test through the advanced API
names.clear();
- for (String value : s) {
- resp = lbHttpSolrServer.query(solrQuery);
+ for (String server : servers) {
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+ LBHttpSolrServer.Rsp rsp = lb2.request(req);
+ // make sure the response came from the first in the list
+ assertEquals(rsp.getServer(), serverList.getFirst());
+ resp = new QueryResponse(rsp.getResponse(), lb);
assertEquals(10, resp.getResults().getNumFound());
names.add(resp.getResults().get(0).getFieldValue("name").toString());
+
+ // rotate the server list
+ serverList.addLast(serverList.removeFirst());
}
assertEquals(3, names.size());
+
+
+ // slow LB for Simple API
+ LBHttpSolrServer slowLB = new LBHttpSolrServer(httpClient, servers);
+ slowLB.setAliveCheckInterval(1000000000);
+
+ // slow LB for advanced API
+ LBHttpSolrServer slowLB2 = new LBHttpSolrServer(httpClient);
+ slowLB2.setAliveCheckInterval(1000000000);
+
+ // stop all solr servers
+ for (SolrInstance solrInstance : solr) {
+ solrInstance.jetty.stop();
+ solrInstance.jetty = null;
+ }
+
+ try {
+ resp = slowLB.query(solrQuery);
+ TestCase.fail(); // all servers should be down
+ } catch (SolrServerException e) {
+ // expected
+ }
+
+ try {
+ LBHttpSolrServer.Rsp rsp = slowLB2.request(new LBHttpSolrServer.Req(solrRequest, serverList));
+ TestCase.fail(); // all servers should be down
+ } catch (SolrServerException e) {
+ // expected
+ }
+
+ // Start the killed server once again
+ solr[1].startJetty(true);
+
+ // even though one of the servers is now up, the loadbalancer won't yet know this unless we ask
+ // it to try dead servers.
+ try {
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+ req.setNumDeadServersToTry(0);
+ LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+ TestCase.fail(); // all servers still should be marked as down
+ } catch (SolrServerException e) {
+ // expected
+ }
+
+ // the default is to try dead servers if there are no live servers
+ {
+ resp = slowLB.query(solrQuery);
+ }
+
+ // the default is to try dead servers if there are no live servers
+ {
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+ LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+ }
+
+ // the last success should have removed the server from the dead server list, so
+ // the next request should succeed even if it doesn't try dead servers.
+ {
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(solrRequest, serverList);
+ req.setNumDeadServersToTry(0);
+ LBHttpSolrServer.Rsp rsp = slowLB2.request(req);
+ }
+
}
- public void testTwoServers() throws Exception {
+ // this test is a subset of testSimple and is no longer needed
+ public void XtestTwoServers() throws Exception {
LBHttpSolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient, solr[0].getUrl(), solr[1].getUrl());
lbHttpSolrServer.setAliveCheckInterval(500);
SolrQuery solrQuery = new SolrQuery("*:*");
@@ -135,15 +255,9 @@
Assert.assertEquals("solr1", name);
solr[1].jetty.stop();
solr[1].jetty = null;
- solr[0].startJetty();
- Thread.sleep(1200);
- try {
- resp = lbHttpSolrServer.query(solrQuery);
- } catch(SolrServerException e) {
- // try again after a pause in case the error is lack of time to start server
- Thread.sleep(3000);
- resp = lbHttpSolrServer.query(solrQuery);
- }
+ solr[0].startJetty(true);
+ Thread.sleep(600);
+ resp = lbHttpSolrServer.query(solrQuery);
name = resp.getResults().get(0).getFieldValue("name").toString();
Assert.assertEquals("solr0", name);
}
@@ -217,10 +331,15 @@
}
public void startJetty() throws Exception {
+ startJetty(false);
+ }
+
+
+ public void startJetty(boolean waitUntilUp) throws Exception {
jetty = new JettySolrRunner("/solr", port);
System.setProperty("solr.solr.home", getHomeDir());
System.setProperty("solr.data.dir", getDataDir());
- jetty.start();
+ jetty.start(waitUntilUp);
int newPort = jetty.getLocalPort();
if (port != 0 && newPort != port) {
TestCase.fail("TESTING FAILURE: could not grab requested port.");
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=898144&r1=898143&r2=898144&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Jan 12 01:20:54 2010
@@ -36,6 +36,9 @@
// we don't call super.setUp
log.info("####SETUP_START " + getName());
portSeed = 13000;
+
+ // TODO: HACK: inserting dead servers doesn't currently work with these tests
+ deadServers = null;
System.setProperty("zkHost", AbstractZkTestCase.ZOO_KEEPER_ADDRESS);
String zkDir = tmpDir.getAbsolutePath() + File.separator