You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2012/03/07 19:17:26 UTC
svn commit: r1298032 - in
/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component:
HttpShardHandler.java HttpShardHandlerFactory.java SearchHandler.java
ShardHandler.java ShardHandlerFactory.java ShardResponse.java
Author: erick
Date: Wed Mar 7 18:17:26 2012
New Revision: 1298032
URL: http://svn.apache.org/viewvc?rev=1298032&view=rev
Log:
SOLR-3079
Added:
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
Modified:
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Mar 7 18:17:26 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.request.SolrQueryRequest;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class HttpShardHandler extends ShardHandler {
+
+ private HttpShardHandlerFactory httpShardHandlerFactory;
+ private CompletionService<ShardResponse> completionService;
+ private Set<Future<ShardResponse>> pending;
+ private Map<String, List<String>> shardToURLs;
+
+ public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
+ this.httpShardHandlerFactory = httpShardHandlerFactory;
+ completionService = new ExecutorCompletionService<ShardResponse>(httpShardHandlerFactory.commExecutor);
+ pending = new HashSet<Future<ShardResponse>>();
+
+ // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+ // This is primarily to keep track of what order we should use to query the replicas of a shard
+ // so that we use the same replica for all phases of a distributed request.
+ shardToURLs = new HashMap<String, List<String>>();
+ }
+
+ private static class SimpleSolrResponse extends SolrResponse {
+ long elapsedTime;
+ NamedList<Object> nl;
+
+ @Override
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ @Override
+ public NamedList<Object> getResponse() {
+ return nl;
+ }
+
+ @Override
+ public void setResponse(NamedList<Object> rsp) {
+ nl = rsp;
+ }
+ }
+
+ // Not thread safe... don't use in Callable.
+ // Don't modify the returned URL list.
+ private List<String> getURLs(String shard) {
+ List<String> urls = shardToURLs.get(shard);
+ if (urls == null) {
+ urls = StrUtils.splitSmart(shard, "|", true);
+
+ // convert shard to URL
+ for (int i = 0; i < urls.size(); i++) {
+ urls.set(i, httpShardHandlerFactory.scheme + urls.get(i));
+ }
+
+ //
+ // Shuffle the list instead of use round-robin by default.
+ // This prevents accidental synchronization where multiple shards could get in sync
+ // and query the same replica at the same time.
+ //
+ if (urls.size() > 1)
+ Collections.shuffle(urls, httpShardHandlerFactory.r);
+ shardToURLs.put(shard, urls);
+ }
+ return urls;
+ }
+
+ public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+ // do this outside of the callable for thread safety reasons
+ final List<String> urls = getURLs(shard);
+
+ Callable<ShardResponse> task = new Callable<ShardResponse>() {
+ public ShardResponse call() throws Exception {
+ ShardResponse srsp = new ShardResponse();
+ srsp.setShardRequest(sreq);
+ srsp.setShard(shard);
+ SimpleSolrResponse ssr = new SimpleSolrResponse();
+ srsp.setSolrResponse(ssr);
+ long startTime = System.currentTimeMillis();
+
+ try {
+ // String url = "http://" + shard + "/select";
+ String url = "http://" + shard;
+
+ params.remove(CommonParams.WT); // use default (currently javabin)
+ params.remove(CommonParams.VERSION);
+
+ SolrServer server = new CommonsHttpSolrServer(url, httpShardHandlerFactory.client);
+ // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
+ // use generic request to avoid extra processing of queries
+ QueryRequest req = new QueryRequest(params);
+ req.setMethod(SolrRequest.METHOD.POST);
+
+ // no need to set the response parser as binary is the default
+ // req.setResponseParser(new BinaryResponseParser());
+ // srsp.rsp = server.request(req);
+ // srsp.rsp = server.query(sreq.params);
+
+ ssr.nl = server.request(req);
+ } catch (Throwable th) {
+ srsp.setException(th);
+ if (th instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) th).code());
+ } else {
+ srsp.setResponseCode(-1);
+ }
+ }
+
+ ssr.elapsedTime = System.currentTimeMillis() - startTime;
+
+ return srsp;
+ }
+ };
+ pending.add(completionService.submit(task));
+ }
+
+ /**
+ * returns a ShardResponse of the last response correlated with a ShardRequest,
+ * or immediately returns a ShardResponse if there was an error detected
+ */
+ public ShardResponse takeCompletedOrError() {
+ while (pending.size() > 0) {
+ try {
+ Future<ShardResponse> future = completionService.take();
+ pending.remove(future);
+ ShardResponse rsp = future.get();
+ if (rsp.getException() != null) return rsp; // if exception, return immediately
+ // add response to the response list... we do this after the take() and
+ // not after the completion of "call" so we know when the last response
+ // for a request was received. Otherwise we might return the same
+ // request more than once.
+ rsp.getShardRequest().responses.add(rsp);
+ if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
+ return rsp;
+ }
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (ExecutionException e) {
+ // should be impossible... the problem with catching the exception
+ // at this level is we don't know what ShardRequest it applied to
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
+ }
+ }
+ return null;
+ }
+
+ public void cancelAll() {
+ for (Future<ShardResponse> future : pending) {
+ // TODO: any issues with interrupting? shouldn't be if
+ // there are finally blocks to release connections.
+ future.cancel(true);
+ }
+ }
+
+ public void checkDistributed(ResponseBuilder rb) {
+ SolrQueryRequest req = rb.req;
+ SolrParams params = req.getParams();
+
+ String shards_rows = params.get(ShardParams.SHARDS_ROWS);
+ if (shards_rows != null) {
+ rb.shards_rows = Integer.parseInt(shards_rows);
+ }
+ String shards_start = params.get(ShardParams.SHARDS_START);
+ if (shards_start != null) {
+ rb.shards_start = Integer.parseInt(shards_start);
+ }
+ }
+
+}
+
Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Wed Mar 7 18:17:26 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author noble.paul@teamaol.com (noblep01)
+ * Date: 6/21/11
+ * Time: 12:14 PM
+ */
+public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized {
+ protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
+
+ // We want an executor that doesn't take up any resources if
+ // it's not used, so it could be created statically for
+ // the distributed search component if desired.
+ //
+ // Consider CallerRuns policy and a lower max threads to throttle
+ // requests at some point (or should we simply return failure?)
+ Executor commExecutor = new ThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE,
+ 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
+ new SynchronousQueue<Runnable>() // directly hand off tasks
+ );
+
+ HttpClient client;
+ Random r = new Random();
+ int soTimeout = 0; //current default values
+ int connectionTimeout = 0; //current default values
+ int maxConnectionsPerHost = 20;
+
+ public String scheme = "http://"; //current default values
+ // socket timeout measured in ms, closes a socket if read
+ // takes longer than x ms to complete. throws
+ // java.net.SocketTimeoutException: Read timed out exception
+ static final String INIT_SO_TIMEOUT = "socketTimeout";
+
+ // connection timeout measures in ms, closes a socket if connection
+ // cannot be established within x ms. with a
+ // java.net.SocketTimeoutException: Connection timed out
+ static final String INIT_CONNECTION_TIMEOUT = "connTimeout";
+
+ // URL scheme to be used in distributed search.
+ static final String INIT_URL_SCHEME = "urlScheme";
+
+ // Maximum connections allowed per host
+ static final String INIT_MAX_CONNECTION_PER_HOST = "maxConnectionsPerHost";
+
+ public ShardHandler getShardHandler() {
+ return new HttpShardHandler(this);
+ }
+
+ public void init(PluginInfo info) {
+
+ if (info.initArgs != null) {
+ Object so = info.initArgs.get(INIT_SO_TIMEOUT);
+ if (so != null) {
+ soTimeout = (Integer) so;
+ log.info("Setting socketTimeout to: " + soTimeout);
+ }
+
+ Object urlScheme = info.initArgs.get(INIT_URL_SCHEME);
+ if (urlScheme != null) {
+ scheme = urlScheme + "://";
+ log.info("Setting urlScheme to: " + urlScheme);
+ }
+
+ Object co = info.initArgs.get(INIT_CONNECTION_TIMEOUT);
+ if (co != null) {
+ connectionTimeout = (Integer) co;
+ log.info("Setting shard-connection-timeout to: " + connectionTimeout);
+ }
+
+ Object maxConnections = info.initArgs.get(INIT_MAX_CONNECTION_PER_HOST);
+ if (maxConnections != null) {
+ maxConnectionsPerHost = (Integer) maxConnections;
+ }
+ }
+
+ MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ mgr.getParams().setDefaultMaxConnectionsPerHost(maxConnectionsPerHost);
+ mgr.getParams().setMaxTotalConnections(10000);
+ mgr.getParams().setConnectionTimeout(connectionTimeout);
+ mgr.getParams().setSoTimeout(soTimeout);
+ // mgr.getParams().setStaleCheckingEnabled(false);
+
+ client = new HttpClient(mgr);
+
+ // prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
+ DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
+ client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
+ }
+}
Modified: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1298032&r1=1298031&r2=1298032&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Mar 7 18:17:26 2012
@@ -17,58 +17,40 @@
package org.apache.solr.handler.component;
-import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.RTimer;
-import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.RTimer;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.solr.core.SolrCore;
-import org.apache.lucene.queryParser.ParseException;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.HttpClient;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.*;
-import java.util.concurrent.*;
/**
*
* Refer SOLR-281
*
*/
-public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
-{
+public class SearchHandler extends RequestHandlerBase implements SolrCoreAware, PluginInfoInitialized {
static final String INIT_COMPONENTS = "components";
static final String INIT_FIRST_COMPONENTS = "first-components";
static final String INIT_LAST_COMPONENTS = "last-components";
- // socket timeout measured in ms, closes a socket if read
- // takes longer than x ms to complete. throws
- // java.net.SocketTimeoutException: Read timed out exception
- static final String INIT_SO_TIMEOUT = "shard-socket-timeout";
-
- // connection timeout measures in ms, closes a socket if connection
- // cannot be established within x ms. with a
- // java.net.SocketTimeoutException: Connection timed out
- static final String INIT_CONNECTION_TIMEOUT = "shard-connection-timeout";
- static int soTimeout = 0; //current default values
- static int connectionTimeout = 0; //current default values
-
protected static Logger log = LoggerFactory.getLogger(SearchHandler.class);
protected List<SearchComponent> components = null;
+ private ShardHandlerFactory shardHandlerFactory = new HttpShardHandlerFactory();
+ private PluginInfo shfInfo;
protected List<String> getDefaultComponents()
{
@@ -82,6 +64,16 @@ public class SearchHandler extends Reque
return names;
}
+ public void init(PluginInfo info) {
+ init(info.initArgs);
+ for (PluginInfo child : info.children) {
+ if("shardHandlerFactory".equals(child.type)){
+ this.shfInfo = child;
+ break;
+ }
+ }
+ }
+
/**
* Initialize the components based on name. Note, if using {@link #INIT_FIRST_COMPONENTS} or {@link #INIT_LAST_COMPONENTS},
* then the {@link DebugComponent} will always occur last. If this is not desired, then one must explicitly declare all components using
@@ -136,17 +128,13 @@ public class SearchHandler extends Reque
log.info("Adding debug component:" + dbgCmp);
}
- Object co = initArgs.get(INIT_CONNECTION_TIMEOUT);
- if (co != null) {
- connectionTimeout = (Integer) co;
- log.info("Setting shard-connection-timeout to: " + connectionTimeout);
+ if(shfInfo == null) {
+ Map m = new HashMap();
+ m.put("class", HttpShardHandlerFactory.class.getName());
+ shfInfo = new PluginInfo("ShardHandlerFactory", m, null, Collections.<PluginInfo>emptyList());
}
- Object so = initArgs.get(INIT_SO_TIMEOUT);
- if (so != null) {
- soTimeout = (Integer) so;
- log.info("Setting shard-socket-timeout to: " + soTimeout);
- }
+ shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
}
public List<SearchComponent> getComponents() {
@@ -168,6 +156,9 @@ public class SearchHandler extends Reque
final RTimer timer = rb.isDebug() ? new RTimer() : null;
+ ShardHandler shardHandler1 = shardHandlerFactory.getShardHandler();
+ shardHandler1.checkDistributed(rb);
+
if (timer == null) {
// non-debugging prepare phase
for( SearchComponent c : components ) {
@@ -216,8 +207,6 @@ public class SearchHandler extends Reque
} else {
// a distributed request
- HttpCommComponent comm = new HttpCommComponent();
-
if (rb.outgoing == null) {
rb.outgoing = new LinkedList<ShardRequest>();
}
@@ -264,7 +253,7 @@ public class SearchHandler extends Reque
} else {
params.set(CommonParams.QT, shardHandler);
}
- comm.submit(sreq, shard, params);
+ shardHandler1.submit(sreq, shard, params);
}
}
@@ -273,13 +262,13 @@ public class SearchHandler extends Reque
// the outgoing queue, send them out immediately (by exiting
// this loop)
while (rb.outgoing.size() == 0) {
- ShardResponse srsp = comm.takeCompletedOrError();
+ ShardResponse srsp = shardHandler1.takeCompletedOrError();
if (srsp == null) break; // no more requests to wait for
// Was there an exception? If so, abort everything and
// rethrow
if (srsp.getException() != null) {
- comm.cancelAll();
+ shardHandler1.cancelAll();
if (srsp.getException() instanceof SolrException) {
throw (SolrException)srsp.getException();
} else {
@@ -335,172 +324,3 @@ public class SearchHandler extends Reque
return "$URL$";
}
}
-
-
-// TODO: generalize how a comm component can fit into search component framework
-// TODO: statics should be per-core singletons
-
-class HttpCommComponent {
-
- // We want an executor that doesn't take up any resources if
- // it's not used, so it could be created statically for
- // the distributed search component if desired.
- //
- // Consider CallerRuns policy and a lower max threads to throttle
- // requests at some point (or should we simply return failure?)
- static Executor commExecutor = new ThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE,
- 5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
- new SynchronousQueue<Runnable>() // directly hand off tasks
- );
-
-
- static HttpClient client;
-
- static {
- MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
- mgr.getParams().setDefaultMaxConnectionsPerHost(20);
- mgr.getParams().setMaxTotalConnections(10000);
- mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
- mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
- // mgr.getParams().setStaleCheckingEnabled(false);
- client = new HttpClient(mgr);
- }
-
- CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
- Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
-
- HttpCommComponent() {
- }
-
- private static class SimpleSolrResponse extends SolrResponse {
- long elapsedTime;
- NamedList<Object> nl;
-
- @Override
- public long getElapsedTime() {
- return elapsedTime;
- }
-
- @Override
- public NamedList<Object> getResponse() {
- return nl;
- }
-
- @Override
- public void setResponse(NamedList<Object> rsp) {
- nl = rsp;
- }
- }
-
- void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
- Callable<ShardResponse> task = new Callable<ShardResponse>() {
- public ShardResponse call() throws Exception {
-
- ShardResponse srsp = new ShardResponse();
- srsp.setShardRequest(sreq);
- srsp.setShard(shard);
- SimpleSolrResponse ssr = new SimpleSolrResponse();
- srsp.setSolrResponse(ssr);
- long startTime = System.currentTimeMillis();
-
- try {
- // String url = "http://" + shard + "/select";
- String url = "http://" + shard;
-
- params.remove(CommonParams.WT); // use default (currently javabin)
- params.remove(CommonParams.VERSION);
-
- SolrServer server = new CommonsHttpSolrServer(url, client);
- // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
- // use generic request to avoid extra processing of queries
- QueryRequest req = new QueryRequest(params);
- req.setMethod(SolrRequest.METHOD.POST);
-
- // no need to set the response parser as binary is the default
- // req.setResponseParser(new BinaryResponseParser());
- // srsp.rsp = server.request(req);
- // srsp.rsp = server.query(sreq.params);
-
- ssr.nl = server.request(req);
- } catch (Throwable th) {
- srsp.setException(th);
- if (th instanceof SolrException) {
- srsp.setResponseCode(((SolrException)th).code());
- } else {
- srsp.setResponseCode(-1);
- }
- }
-
- ssr.elapsedTime = System.currentTimeMillis() - startTime;
-
- return srsp;
- }
- };
-
- pending.add( completionService.submit(task) );
- }
-
- /** returns a ShardResponse of the last response correlated with a ShardRequest */
- ShardResponse take() {
- while (pending.size() > 0) {
- try {
- Future<ShardResponse> future = completionService.take();
- pending.remove(future);
- ShardResponse rsp = future.get();
- rsp.getShardRequest().responses.add(rsp);
- if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
- return rsp;
- }
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (ExecutionException e) {
- // should be impossible... the problem with catching the exception
- // at this level is we don't know what ShardRequest it applied to
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
- }
- }
- return null;
- }
-
-
- /** returns a ShardResponse of the last response correlated with a ShardRequest,
- * or immediately returns a ShardResponse if there was an error detected
- */
- ShardResponse takeCompletedOrError() {
- while (pending.size() > 0) {
- try {
- Future<ShardResponse> future = completionService.take();
- pending.remove(future);
- ShardResponse rsp = future.get();
- if (rsp.getException() != null) return rsp; // if exception, return immediately
- // add response to the response list... we do this after the take() and
- // not after the completion of "call" so we know when the last response
- // for a request was received. Otherwise we might return the same
- // request more than once.
- rsp.getShardRequest().responses.add(rsp);
- if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
- return rsp;
- }
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (ExecutionException e) {
- // should be impossible... the problem with catching the exception
- // at this level is we don't know what ShardRequest it applied to
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
- }
- }
- return null;
- }
-
-
- void cancelAll() {
- for (Future<ShardResponse> future : pending) {
- // TODO: any issues with interrupting? shouldn't be if
- // there are finally blocks to release connections.
- future.cancel(true);
- }
- }
-
-}
Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java Wed Mar 7 18:17:26 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public abstract class ShardHandler {
+ public abstract void checkDistributed(ResponseBuilder rb);
+ public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
+ public abstract ShardResponse takeCompletedOrError();
+ public abstract void cancelAll();
+}
Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java Wed Mar 7 18:17:26 2012
@@ -0,0 +1,5 @@
+package org.apache.solr.handler.component;
+
+public abstract class ShardHandlerFactory {
+ public abstract ShardHandler getShardHandler();
+}
Modified: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=1298032&r1=1298031&r2=1298032&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java Wed Mar 7 18:17:26 2012
@@ -56,12 +56,12 @@ public final class ShardResponse {
return shard;
}
- void setShardRequest(ShardRequest rsp)
+ public void setShardRequest(ShardRequest rsp)
{
this.req = rsp;
}
- void setSolrResponse(SolrResponse rsp)
+ public void setSolrResponse(SolrResponse rsp)
{
this.rsp = rsp;
}