You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ry...@apache.org on 2012/02/29 08:32:02 UTC
svn commit: r1294995 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/component/
core/src/test/org/apache/solr/
Author: ryan
Date: Wed Feb 29 07:32:02 2012
New Revision: 1294995
URL: http://svn.apache.org/viewvc?rev=1294995&view=rev
Log:
SOLR-3134: Adding test and fix from Russell Black
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/TestDistributedSearch.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Feb 29 07:32:02 2012
@@ -227,7 +227,8 @@ New Features
* SOLR-2459: Expose LogLevel selection with a RequestHandler rather then servlet
(Stefan Matheis, Upayavira, ryan)
-* SOLR-3134: Include shard info in distributed response when shards.info=true (ryan)
+* SOLR-3134: Include shard info in distributed response when shards.info=true
+ (Russell Black, ryan)
Optimizations
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Feb 29 07:32:02 2012
@@ -180,26 +180,11 @@ public class HttpShardHandler extends Sh
pending.add( completionService.submit(task) );
}
- /** returns a ShardResponse of the last response correlated with a ShardRequest */
- ShardResponse take() {
- while (pending.size() > 0) {
- try {
- Future<ShardResponse> future = completionService.take();
- pending.remove(future);
- ShardResponse rsp = future.get();
- rsp.getShardRequest().responses.add(rsp);
- if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
- return rsp;
- }
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (ExecutionException e) {
- // should be impossible... the problem with catching the exception
- // at this level is we don't know what ShardRequest it applied to
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
- }
- }
- return null;
+ /** returns a ShardResponse of the last response correlated with a ShardRequest. This won't
+ * return early if it runs into an error.
+ **/
+ public ShardResponse takeCompletedIncludingErrors() {
+ return take(false);
}
@@ -207,12 +192,17 @@ public class HttpShardHandler extends Sh
* or immediately returns a ShardResponse if there was an error detected
*/
public ShardResponse takeCompletedOrError() {
+ return take(true);
+ }
+
+ private ShardResponse take(boolean bailOnError) {
+
while (pending.size() > 0) {
try {
Future<ShardResponse> future = completionService.take();
pending.remove(future);
ShardResponse rsp = future.get();
- if (rsp.getException() != null) return rsp; // if exception, return immediately
+ if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Feb 29 07:32:02 2012
@@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.ReaderUtil;
import org.apache.lucene.util.UnicodeUtil;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
@@ -75,6 +76,8 @@ import org.apache.solr.search.grouping.e
import org.apache.solr.util.SolrPluginUtils;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.net.URL;
import java.util.*;
@@ -771,25 +774,47 @@ public class QueryComponent extends Sear
NamedList<Object> shardInfo = null;
if(rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false)) {
- shardInfo = (NamedList<Object>) rb.rsp.getValues().get(ShardParams.SHARDS_INFO);
- if(shardInfo==null) {
- shardInfo = new SimpleOrderedMap<Object>();
- rb.rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);
- }
+ shardInfo = new SimpleOrderedMap<Object>();
+ rb.rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);
}
long numFound = 0;
Float maxScore=null;
for (ShardResponse srsp : sreq.responses) {
- SolrDocumentList docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
+ SolrDocumentList docs = null;
if(shardInfo!=null) {
SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();
- nl.add("numFound", docs.getNumFound());
- nl.add("maxScore", docs.getMaxScore());
- nl.add("time", srsp.getSolrResponse().getElapsedTime());
+
+ if (srsp.getException() != null) {
+ Throwable t = srsp.getException();
+ if(t instanceof SolrServerException) {
+ t = ((SolrServerException)t).getCause();
+ }
+ nl.add("error", t.toString() );
+ StringWriter trace = new StringWriter();
+ t.printStackTrace(new PrintWriter(trace));
+ nl.add("trace", trace.toString() );
+ }
+ else {
+ docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
+ nl.add("numFound", docs.getNumFound());
+ nl.add("maxScore", docs.getMaxScore());
+ }
+ if(srsp.getSolrResponse()!=null) {
+ nl.add("time", srsp.getSolrResponse().getElapsedTime());
+ }
+
shardInfo.add(srsp.getShard(), nl);
}
+ // now that we've added the shard info, let's only proceed if we have no error.
+ if (srsp.getException() != null) {
+ continue;
+ }
+
+ if (docs == null) { // could have been initialized in the shards info block above
+ docs = (SolrDocumentList)srsp.getSolrResponse().getResponse().get("response");
+ }
// calculate global maxScore and numDocsFound
if (docs.getMaxScore() != null) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Feb 29 07:32:02 2012
@@ -18,14 +18,11 @@
package org.apache.solr.handler.component;
import org.apache.lucene.queryparser.classic.ParseException;
-import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.RTimer;
-import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
@@ -38,8 +35,6 @@ import org.apache.solr.util.plugin.SolrC
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.*;
/**
@@ -285,38 +280,17 @@ public class SearchHandler extends Reque
// now wait for replies, but if anyone puts more requests on
// the outgoing queue, send them out immediately (by exiting
// this loop)
+ boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
while (rb.outgoing.size() == 0) {
- ShardResponse srsp = shardHandler1.takeCompletedOrError();
+ ShardResponse srsp = tolerant ?
+ shardHandler1.takeCompletedIncludingErrors():
+ shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for
// Was there an exception?
if (srsp.getException() != null) {
- // If things are tolerant, just continue
- if(rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false)) {
- if( rb.req.getParams().getBool(ShardParams.SHARDS_INFO, false) ) {
- NamedList<Object> sinfo = (NamedList<Object>) rb.rsp.getValues().get(ShardParams.SHARDS_INFO);
- if(sinfo==null) {
- sinfo = new SimpleOrderedMap<Object>();
- rb.rsp.getValues().add(ShardParams.SHARDS_INFO,sinfo);
- }
-
- SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();
- Throwable t = srsp.getException();
- if(t instanceof SolrServerException) {
- t = ((SolrServerException)t).getCause();
- }
- nl.add("error", t.toString() );
-
- StringWriter trace = new StringWriter();
- t.printStackTrace(new PrintWriter(trace));
- nl.add("trace", trace.toString() );
- if(srsp.getSolrResponse()!=null){
- nl.add("time", srsp.getSolrResponse().getElapsedTime());
- }
- sinfo.add(srsp.getShard(), nl);
- }
- }
- else { // If so, abort everything and rethrow
+ // If things are not tolerant, abort everything and rethrow
+ if(!tolerant) {
shardHandler1.cancelAll();
if (srsp.getException() instanceof SolrException) {
throw (SolrException)srsp.getException();
@@ -336,8 +310,8 @@ public class SearchHandler extends Reque
}
for(SearchComponent c : components) {
- c.finishStage(rb);
- }
+ c.finishStage(rb);
+ }
// we are done when the next stage is MAX_VALUE
} while (nextStage != Integer.MAX_VALUE);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java Wed Feb 29 07:32:02 2012
@@ -22,6 +22,7 @@ import org.apache.solr.common.params.Mod
public abstract class ShardHandler {
public abstract void checkDistributed(ResponseBuilder rb);
public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
+ public abstract ShardResponse takeCompletedIncludingErrors();
public abstract ShardResponse takeCompletedOrError();
public abstract void cancelAll();
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/TestDistributedSearch.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/TestDistributedSearch.java?rev=1294995&r1=1294994&r2=1294995&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/TestDistributedSearch.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/TestDistributedSearch.java Wed Feb 29 07:32:02 2012
@@ -17,8 +17,17 @@
package org.apache.solr;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
@@ -292,7 +301,32 @@ public class TestDistributedSearch exten
assertNotNull("missing shard info", sinfo);
assertEquals("should have an entry for each shard ["+sinfo+"] "+shards, cnt, sinfo.size());
-
+ // test shards.tolerant=true
+ for(int numDownServers = 0; numDownServers < jettys.size()-1; numDownServers++)
+ {
+ List<JettySolrRunner> upJettys = new ArrayList<JettySolrRunner>(jettys);
+ List<SolrServer> upClients = new ArrayList<SolrServer>(clients);
+ List<JettySolrRunner> downJettys = new ArrayList<JettySolrRunner>();
+ List<String> upShards = new ArrayList<String>(Arrays.asList(shardsArr));
+ for(int i=0; i<numDownServers; i++)
+ {
+ // shut down some of the jettys
+ int indexToRemove = r.nextInt(upJettys.size());
+ JettySolrRunner downJetty = upJettys.remove(indexToRemove);
+ upClients.remove(indexToRemove);
+ upShards.remove(indexToRemove);
+ ChaosMonkey.stop(downJetty);
+ downJettys.add(downJetty);
+ }
+
+ queryPartialResults(upShards, upClients, "q","*:*",ShardParams.SHARDS_INFO,"true",ShardParams.SHARDS_TOLERANT,"true");
+
+ // restart the jettys
+ for (JettySolrRunner downJetty : downJettys) {
+ downJetty.start();
+ }
+ }
+
// This index has the same number for every field
// TODO: This test currently fails because debug info is obtained only
@@ -301,5 +335,90 @@ public class TestDistributedSearch exten
// Thread.sleep(10000000000L);
}
+
+ protected void queryPartialResults(final List<String> upShards, List<SolrServer> upClients, Object... q) throws Exception {
+
+ final ModifiableSolrParams params = new ModifiableSolrParams();
+
+ for (int i = 0; i < q.length; i += 2) {
+ params.add(q[i].toString(), q[i + 1].toString());
+ }
+ // TODO: look into why passing true causes fails
+ params.set("distrib", "false");
+ final QueryResponse controlRsp = controlClient.query(params);
+ validateControlData(controlRsp);
+
+ params.remove("distrib");
+ setDistributedParams(params);
+
+ QueryResponse rsp = queryRandomUpServer(params,upClients);
+
+ comparePartialResponses(rsp, controlRsp, upShards);
+
+ if (stress > 0) {
+ log.info("starting stress...");
+ Thread[] threads = new Thread[nThreads];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < stress; j++) {
+ int which = r.nextInt(clients.size());
+ SolrServer client = clients.get(which);
+ try {
+ QueryResponse rsp = client.query(new ModifiableSolrParams(params));
+ if (verifyStress) {
+ comparePartialResponses(rsp, controlRsp, upShards);
+ }
+ } catch (SolrServerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ };
+ threads[i].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+ }
+ protected QueryResponse queryRandomUpServer(ModifiableSolrParams params, List<SolrServer> upClients) throws SolrServerException {
+ // query a random "up" server
+ int which = r.nextInt(upClients.size());
+ SolrServer client = upClients.get(which);
+ QueryResponse rsp = client.query(params);
+ return rsp;
+ }
+
+ protected void comparePartialResponses(QueryResponse rsp, QueryResponse controlRsp, List<String> upShards)
+ {
+ NamedList<?> sinfo = (NamedList<?>) rsp.getResponse().get(ShardParams.SHARDS_INFO);
+
+ assertNotNull("missing shard info", sinfo);
+ assertEquals("should have an entry for each shard ["+sinfo+"] "+shards, shardsArr.length, sinfo.size());
+ // identify each one
+ for (Map.Entry<String,?> entry : sinfo) {
+ String shard = entry.getKey();
+ NamedList<?> info = (NamedList<?>) entry.getValue();
+ boolean found = false;
+ for(int i=0; i<shardsArr.length; i++) {
+ String s = shardsArr[i];
+ if (shard.contains(s)) {
+ found = true;
+ // make sure that it responded if it's up
+ if (upShards.contains(s)) {
+ assertTrue("Expected to find numFound in the up shard info",info.get("numFound") != null);
+ }
+ else {
+ assertTrue("Expected to find error in the down shard info",info.get("error") != null);
+ }
+ }
+ }
+ assertTrue("Couldn't find shard " + shard + " represented in shards info", found);
+ }
+ }
+
}