You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2012/03/07 19:17:26 UTC

svn commit: r1298032 - in /lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component: HttpShardHandler.java HttpShardHandlerFactory.java SearchHandler.java ShardHandler.java ShardHandlerFactory.java ShardResponse.java

Author: erick
Date: Wed Mar  7 18:17:26 2012
New Revision: 1298032

URL: http://svn.apache.org/viewvc?rev=1298032&view=rev
Log:
SOLR-3079

Added:
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
Modified:
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
    lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java

Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java Wed Mar  7 18:17:26 2012
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.request.SolrQueryRequest;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class HttpShardHandler extends ShardHandler {
+
+  private HttpShardHandlerFactory httpShardHandlerFactory;
+  private CompletionService<ShardResponse> completionService;
+  private Set<Future<ShardResponse>> pending;
+  private Map<String, List<String>> shardToURLs;
+
+  public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
+    this.httpShardHandlerFactory = httpShardHandlerFactory;
+    completionService = new ExecutorCompletionService<ShardResponse>(httpShardHandlerFactory.commExecutor);
+    pending = new HashSet<Future<ShardResponse>>();
+
+    // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+    // This is primarily to keep track of what order we should use to query the replicas of a shard
+    // so that we use the same replica for all phases of a distributed request.
+    shardToURLs = new HashMap<String, List<String>>();
+  }
+
+  private static class SimpleSolrResponse extends SolrResponse {
+    long elapsedTime;
+    NamedList<Object> nl;
+
+    @Override
+    public long getElapsedTime() {
+      return elapsedTime;
+    }
+
+    @Override
+    public NamedList<Object> getResponse() {
+      return nl;
+    }
+
+    @Override
+    public void setResponse(NamedList<Object> rsp) {
+      nl = rsp;
+    }
+  }
+
+  // Not thread safe... don't use in Callable.
+  // Don't modify the returned URL list.
+  private List<String> getURLs(String shard) {
+    List<String> urls = shardToURLs.get(shard);
+    if (urls == null) {
+      urls = StrUtils.splitSmart(shard, "|", true);
+
+      // convert shard to URL
+      for (int i = 0; i < urls.size(); i++) {
+        urls.set(i, httpShardHandlerFactory.scheme + urls.get(i));
+      }
+
+      //
+      // Shuffle the list instead of use round-robin by default.
+      // This prevents accidental synchronization where multiple shards could get in sync
+      // and query the same replica at the same time.
+      //
+      if (urls.size() > 1)
+        Collections.shuffle(urls, httpShardHandlerFactory.r);
+      shardToURLs.put(shard, urls);
+    }
+    return urls;
+  }
+
+  public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+    // do this outside of the callable for thread safety reasons
+    final List<String> urls = getURLs(shard);
+
+    Callable<ShardResponse> task = new Callable<ShardResponse>() {
+      public ShardResponse call() throws Exception {
+        ShardResponse srsp = new ShardResponse();
+        srsp.setShardRequest(sreq);
+        srsp.setShard(shard);
+        SimpleSolrResponse ssr = new SimpleSolrResponse();
+        srsp.setSolrResponse(ssr);
+        long startTime = System.currentTimeMillis();
+
+        try {
+          // String url = "http://" + shard + "/select";
+          String url = "http://" + shard;
+
+          params.remove(CommonParams.WT); // use default (currently javabin)
+          params.remove(CommonParams.VERSION);
+
+          SolrServer server = new CommonsHttpSolrServer(url, httpShardHandlerFactory.client);
+          // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
+          // use generic request to avoid extra processing of queries
+          QueryRequest req = new QueryRequest(params);
+          req.setMethod(SolrRequest.METHOD.POST);
+
+          // no need to set the response parser as binary is the default
+          // req.setResponseParser(new BinaryResponseParser());
+          // srsp.rsp = server.request(req);
+          // srsp.rsp = server.query(sreq.params);
+
+          ssr.nl = server.request(req);
+        } catch (Throwable th) {
+          srsp.setException(th);
+          if (th instanceof SolrException) {
+            srsp.setResponseCode(((SolrException) th).code());
+          } else {
+            srsp.setResponseCode(-1);
+          }
+        }
+
+        ssr.elapsedTime = System.currentTimeMillis() - startTime;
+
+        return srsp;
+      }
+    };
+    pending.add(completionService.submit(task));
+  }
+
+  /**
+   * returns a ShardResponse of the last response correlated with a ShardRequest,
+   * or immediately returns a ShardResponse if there was an error detected
+   */
+  public ShardResponse takeCompletedOrError() {
+    while (pending.size() > 0) {
+      try {
+        Future<ShardResponse> future = completionService.take();
+        pending.remove(future);
+        ShardResponse rsp = future.get();
+        if (rsp.getException() != null) return rsp; // if exception, return immediately
+        // add response to the response list... we do this after the take() and
+        // not after the completion of "call" so we know when the last response
+        // for a request was received.  Otherwise we might return the same
+        // request more than once.
+        rsp.getShardRequest().responses.add(rsp);
+        if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
+          return rsp;
+        }
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      } catch (ExecutionException e) {
+        // should be impossible... the problem with catching the exception
+        // at this level is we don't know what ShardRequest it applied to
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
+      }
+    }
+    return null;
+  }
+
+  public void cancelAll() {
+    for (Future<ShardResponse> future : pending) {
+      // TODO: any issues with interrupting?  shouldn't be if
+      // there are finally blocks to release connections.
+      future.cancel(true);
+    }
+  }
+
+  public void checkDistributed(ResponseBuilder rb) {
+    SolrQueryRequest req = rb.req;
+    SolrParams params = req.getParams();
+
+    String shards_rows = params.get(ShardParams.SHARDS_ROWS);
+    if (shards_rows != null) {
+      rb.shards_rows = Integer.parseInt(shards_rows);
+    }
+    String shards_start = params.get(ShardParams.SHARDS_START);
+    if (shards_start != null) {
+      rb.shards_start = Integer.parseInt(shards_start);
+    }
+  }
+
+}
+

Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java Wed Mar  7 18:17:26 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author noble.paul@teamaol.com (noblep01)
+ *         Date: 6/21/11
+ *         Time: 12:14 PM
+ */
+public class HttpShardHandlerFactory extends ShardHandlerFactory implements PluginInfoInitialized {
+  protected static Logger log = LoggerFactory.getLogger(HttpShardHandlerFactory.class);
+
+  // We want an executor that doesn't take up any resources if
+  // it's not used, so it could be created statically for
+  // the distributed search component if desired.
+  //
+  // Consider CallerRuns policy and a lower max threads to throttle
+  // requests at some point (or should we simply return failure?)
+  Executor commExecutor = new ThreadPoolExecutor(
+      0,
+      Integer.MAX_VALUE,
+      5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
+      new SynchronousQueue<Runnable>()  // directly hand off tasks
+  );
+
+  HttpClient client;
+  Random r = new Random();
+  int soTimeout = 0; //current default values
+  int connectionTimeout = 0; //current default values
+  int maxConnectionsPerHost = 20;
+
+  public String scheme = "http://"; //current default values
+  // socket timeout measured in ms, closes a socket if read
+  // takes longer than x ms to complete. throws
+  // java.net.SocketTimeoutException: Read timed out exception
+  static final String INIT_SO_TIMEOUT = "socketTimeout";
+
+  // connection timeout measures in ms, closes a socket if connection
+  // cannot be established within x ms. with a
+  // java.net.SocketTimeoutException: Connection timed out
+  static final String INIT_CONNECTION_TIMEOUT = "connTimeout";
+
+  // URL scheme to be used in distributed search.
+  static final String INIT_URL_SCHEME = "urlScheme";
+
+  // Maximum connections allowed per host
+  static final String INIT_MAX_CONNECTION_PER_HOST = "maxConnectionsPerHost";
+
+  public ShardHandler getShardHandler() {
+    return new HttpShardHandler(this);
+  }
+
+  public void init(PluginInfo info) {
+
+    if (info.initArgs != null) {
+      Object so = info.initArgs.get(INIT_SO_TIMEOUT);
+      if (so != null) {
+        soTimeout = (Integer) so;
+        log.info("Setting socketTimeout to: " + soTimeout);
+      }
+
+      Object urlScheme = info.initArgs.get(INIT_URL_SCHEME);
+      if (urlScheme != null) {
+        scheme = urlScheme + "://";
+        log.info("Setting urlScheme to: " + urlScheme);
+      }
+
+      Object co = info.initArgs.get(INIT_CONNECTION_TIMEOUT);
+      if (co != null) {
+        connectionTimeout = (Integer) co;
+        log.info("Setting shard-connection-timeout to: " + connectionTimeout);
+      }
+
+      Object maxConnections = info.initArgs.get(INIT_MAX_CONNECTION_PER_HOST);
+      if (maxConnections != null) {
+        maxConnectionsPerHost = (Integer) maxConnections;
+      }
+    }
+
+    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+    mgr.getParams().setDefaultMaxConnectionsPerHost(maxConnectionsPerHost);
+    mgr.getParams().setMaxTotalConnections(10000);
+    mgr.getParams().setConnectionTimeout(connectionTimeout);
+    mgr.getParams().setSoTimeout(soTimeout);
+    // mgr.getParams().setStaleCheckingEnabled(false);
+
+    client = new HttpClient(mgr);
+
+    // prevent retries  (note: this didn't work when set on mgr.. needed to be set on client)
+    DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
+    client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
+  }
+}

Modified: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1298032&r1=1298031&r2=1298032&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Mar  7 18:17:26 2012
@@ -17,58 +17,40 @@
 
 package org.apache.solr.handler.component;
 
-import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.RTimer;
-import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.RTimer;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.util.plugin.PluginInfoInitialized;
 import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.solr.core.SolrCore;
-import org.apache.lucene.queryParser.ParseException;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.HttpClient;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.*;
-import java.util.concurrent.*;
 
 /**
  *
  * Refer SOLR-281
  *
  */
-public class SearchHandler extends RequestHandlerBase implements SolrCoreAware
-{
+public class SearchHandler extends RequestHandlerBase implements SolrCoreAware, PluginInfoInitialized {
   static final String INIT_COMPONENTS = "components";
   static final String INIT_FIRST_COMPONENTS = "first-components";
   static final String INIT_LAST_COMPONENTS = "last-components";
 
-  // socket timeout measured in ms, closes a socket if read
-  // takes longer than x ms to complete. throws
-  // java.net.SocketTimeoutException: Read timed out exception
-  static final String INIT_SO_TIMEOUT = "shard-socket-timeout";
-
-  // connection timeout measures in ms, closes a socket if connection
-  // cannot be established within x ms. with a
-  // java.net.SocketTimeoutException: Connection timed out
-  static final String INIT_CONNECTION_TIMEOUT = "shard-connection-timeout";
-  static int soTimeout = 0; //current default values
-  static int connectionTimeout = 0; //current default values
-
   protected static Logger log = LoggerFactory.getLogger(SearchHandler.class);
 
   protected List<SearchComponent> components = null;
+  private ShardHandlerFactory shardHandlerFactory = new HttpShardHandlerFactory();
+  private PluginInfo shfInfo;
 
   protected List<String> getDefaultComponents()
   {
@@ -82,6 +64,16 @@ public class SearchHandler extends Reque
     return names;
   }
 
+  public void init(PluginInfo info) {
+    init(info.initArgs);
+    for (PluginInfo child : info.children) {
+      if("shardHandlerFactory".equals(child.type)){
+        this.shfInfo = child;
+        break;
+      }
+    }
+  }
+
   /**
    * Initialize the components based on name.  Note, if using {@link #INIT_FIRST_COMPONENTS} or {@link #INIT_LAST_COMPONENTS},
    * then the {@link DebugComponent} will always occur last.  If this is not desired, then one must explicitly declare all components using
@@ -136,17 +128,13 @@ public class SearchHandler extends Reque
       log.info("Adding  debug component:" + dbgCmp);
     }
 
-    Object co = initArgs.get(INIT_CONNECTION_TIMEOUT);
-    if (co != null) {
-      connectionTimeout = (Integer) co;
-      log.info("Setting shard-connection-timeout to: " + connectionTimeout);
+    if(shfInfo == null) {
+      Map m = new HashMap();
+      m.put("class", HttpShardHandlerFactory.class.getName());
+      shfInfo = new PluginInfo("ShardHandlerFactory", m, null, Collections.<PluginInfo>emptyList());
     }
 
-    Object so = initArgs.get(INIT_SO_TIMEOUT);
-    if (so != null) {
-      soTimeout = (Integer) so;
-      log.info("Setting shard-socket-timeout to: " + soTimeout);
-    }
+    shardHandlerFactory = core.createInitInstance(shfInfo, ShardHandlerFactory.class, null, null);
   }
 
   public List<SearchComponent> getComponents() {
@@ -168,6 +156,9 @@ public class SearchHandler extends Reque
 
     final RTimer timer = rb.isDebug() ? new RTimer() : null;
 
+    ShardHandler shardHandler1 = shardHandlerFactory.getShardHandler();
+    shardHandler1.checkDistributed(rb);
+
     if (timer == null) {
       // non-debugging prepare phase
       for( SearchComponent c : components ) {
@@ -216,8 +207,6 @@ public class SearchHandler extends Reque
     } else {
       // a distributed request
 
-      HttpCommComponent comm = new HttpCommComponent();
-
       if (rb.outgoing == null) {
         rb.outgoing = new LinkedList<ShardRequest>();
       }
@@ -264,7 +253,7 @@ public class SearchHandler extends Reque
               } else {
                 params.set(CommonParams.QT, shardHandler);
               }
-              comm.submit(sreq, shard, params);
+              shardHandler1.submit(sreq, shard, params);
             }
           }
 
@@ -273,13 +262,13 @@ public class SearchHandler extends Reque
           // the outgoing queue, send them out immediately (by exiting
           // this loop)
           while (rb.outgoing.size() == 0) {
-            ShardResponse srsp = comm.takeCompletedOrError();
+            ShardResponse srsp = shardHandler1.takeCompletedOrError();
             if (srsp == null) break;  // no more requests to wait for
 
             // Was there an exception?  If so, abort everything and
             // rethrow
             if (srsp.getException() != null) {
-              comm.cancelAll();
+              shardHandler1.cancelAll();
               if (srsp.getException() instanceof SolrException) {
                 throw (SolrException)srsp.getException();
               } else {
@@ -335,172 +324,3 @@ public class SearchHandler extends Reque
     return "$URL$";
   }
 }
-
-
-// TODO: generalize how a comm component can fit into search component framework
-// TODO: statics should be per-core singletons
-
-class HttpCommComponent {
-
-  // We want an executor that doesn't take up any resources if
-  // it's not used, so it could be created statically for
-  // the distributed search component if desired.
-  //
-  // Consider CallerRuns policy and a lower max threads to throttle
-  // requests at some point (or should we simply return failure?)
-  static Executor commExecutor = new ThreadPoolExecutor(
-          0,
-          Integer.MAX_VALUE,
-          5, TimeUnit.SECONDS, // terminate idle threads after 5 sec
-          new SynchronousQueue<Runnable>()  // directly hand off tasks
-  );
-
-
-  static HttpClient client;
-
-  static {
-    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
-    mgr.getParams().setDefaultMaxConnectionsPerHost(20);
-    mgr.getParams().setMaxTotalConnections(10000);
-    mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
-    mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
-    // mgr.getParams().setStaleCheckingEnabled(false);
-    client = new HttpClient(mgr);    
-  }
-
-  CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
-  Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
-
-  HttpCommComponent() {
-  }
-
-  private static class SimpleSolrResponse extends SolrResponse {
-    long elapsedTime;
-    NamedList<Object> nl;
-    
-    @Override
-    public long getElapsedTime() {
-      return elapsedTime;
-    }
-
-    @Override
-    public NamedList<Object> getResponse() {
-      return nl;
-    }
-
-    @Override
-    public void setResponse(NamedList<Object> rsp) {
-      nl = rsp;
-    }
-  }
-
-  void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
-    Callable<ShardResponse> task = new Callable<ShardResponse>() {
-      public ShardResponse call() throws Exception {
-
-        ShardResponse srsp = new ShardResponse();
-        srsp.setShardRequest(sreq);
-        srsp.setShard(shard);
-        SimpleSolrResponse ssr = new SimpleSolrResponse();
-        srsp.setSolrResponse(ssr);
-        long startTime = System.currentTimeMillis();
-
-        try {
-          // String url = "http://" + shard + "/select";
-          String url = "http://" + shard;
-
-          params.remove(CommonParams.WT); // use default (currently javabin)
-          params.remove(CommonParams.VERSION);
-
-          SolrServer server = new CommonsHttpSolrServer(url, client);
-          // SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
-          // use generic request to avoid extra processing of queries
-          QueryRequest req = new QueryRequest(params);
-          req.setMethod(SolrRequest.METHOD.POST);
-
-          // no need to set the response parser as binary is the default
-          // req.setResponseParser(new BinaryResponseParser());
-          // srsp.rsp = server.request(req);
-          // srsp.rsp = server.query(sreq.params);
-
-          ssr.nl = server.request(req);
-        } catch (Throwable th) {
-          srsp.setException(th);
-          if (th instanceof SolrException) {
-            srsp.setResponseCode(((SolrException)th).code());
-          } else {
-            srsp.setResponseCode(-1);
-          }
-        }
-
-        ssr.elapsedTime = System.currentTimeMillis() - startTime;
-
-        return srsp;
-      }
-    };
-
-    pending.add( completionService.submit(task) );
-  }
-
-  /** returns a ShardResponse of the last response correlated with a ShardRequest */
-  ShardResponse take() {
-    while (pending.size() > 0) {
-      try {
-        Future<ShardResponse> future = completionService.take();
-        pending.remove(future);
-        ShardResponse rsp = future.get();
-        rsp.getShardRequest().responses.add(rsp);
-        if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
-          return rsp;
-        }
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (ExecutionException e) {
-        // should be impossible... the problem with catching the exception
-        // at this level is we don't know what ShardRequest it applied to
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
-      }
-    }
-    return null;
-  }
-
-
-  /** returns a ShardResponse of the last response correlated with a ShardRequest,
-   * or immediately returns a ShardResponse if there was an error detected
-   */
-  ShardResponse takeCompletedOrError() {
-    while (pending.size() > 0) {
-      try {
-        Future<ShardResponse> future = completionService.take();
-        pending.remove(future);
-        ShardResponse rsp = future.get();
-        if (rsp.getException() != null) return rsp; // if exception, return immediately
-        // add response to the response list... we do this after the take() and
-        // not after the completion of "call" so we know when the last response
-        // for a request was received.  Otherwise we might return the same
-        // request more than once.
-        rsp.getShardRequest().responses.add(rsp);
-        if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
-          return rsp;
-        }
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (ExecutionException e) {
-        // should be impossible... the problem with catching the exception
-        // at this level is we don't know what ShardRequest it applied to
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception",e);
-      }
-    }
-    return null;
-  }
-
-
-  void cancelAll() {
-    for (Future<ShardResponse> future : pending) {
-      // TODO: any issues with interrupting?  shouldn't be if
-      // there are finally blocks to release connections.
-      future.cancel(true);
-    }
-  }
-
-}

Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandler.java Wed Mar  7 18:17:26 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public abstract class ShardHandler {
+  public abstract void checkDistributed(ResponseBuilder rb);
+  public abstract void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) ;
+  public abstract ShardResponse takeCompletedOrError();
+  public abstract void cancelAll();
+}

Added: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java?rev=1298032&view=auto
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java (added)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardHandlerFactory.java Wed Mar  7 18:17:26 2012
@@ -0,0 +1,5 @@
+package org.apache.solr.handler.component;
+
+public abstract class ShardHandlerFactory {
+  public abstract ShardHandler getShardHandler();
+}

Modified: lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=1298032&r1=1298031&r2=1298032&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/dev/branches/branch_3x/solr/core/src/java/org/apache/solr/handler/component/ShardResponse.java Wed Mar  7 18:17:26 2012
@@ -56,12 +56,12 @@ public final class ShardResponse {
     return shard;
   }
 
-  void setShardRequest(ShardRequest rsp)
+  public void setShardRequest(ShardRequest rsp)
   {
     this.req = rsp;
   }
 
-  void setSolrResponse(SolrResponse rsp)
+  public void setSolrResponse(SolrResponse rsp)
   {
     this.rsp = rsp;
   }