You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/09/11 06:03:04 UTC

svn commit: r1521726 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/handler/loader/ solr/core/src/java/org/apache/solr/update/ solr/core/src/test/org/apache/solr/cloud/ solr/solrj/ solr/solrj/src/java/org/ap...

Author: markrmiller
Date: Wed Sep 11 04:03:04 2013
New Revision: 1521726

URL: http://svn.apache.org/r1521726
Log:
SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node update forwarding.
SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin.
SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.

Removed:
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
    lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
    lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
    lucene/dev/branches/branch_4x/solr/test-framework/   (props changed)
    lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Wed Sep 11 04:03:04 2013
@@ -56,6 +56,15 @@ Upgrading from Solr 4.4.0
   init param option is now deprecated and should be replaced with the more standard
   <arr name="fieldName">.  See SOLR-4249 for more details.
 
+* UpdateRequestExt has been removed as part of SOLR-4816. You should use UpdateRequest 
+  instead.
+  
+* CloudSolrServer can now use multiple threads to add documents by default. This is a
+  small change in runtime semantics when using the bulk add method - you will still
+  end up with the same exception on a failure, but some documents beyond the one that
+  failed may have made it in. To get the old, single threaded behavior, set parallel updates
+  to false on the CloudSolrServer instance.  
+
 Detailed Change List
 ----------------------
 
@@ -107,7 +116,11 @@ New Features
   can specify facet.threads to parallelize loading the uninverted fields. In at least
   one extreme case this reduced warmup time from 20 seconds to 3 seconds. (Janne Majaranta,
   Gun Akkor via Erick Erickson)
+
+* SOLR-4816: CloudSolrServer can now route updates locally and no longer relies on inter-node
+  update forwarding.  (Joel Bernstein, Mark Miller)
   
+* SOLR-3249: Allow CloudSolrServer and SolrCmdDistributor to use JavaBin. (Mark Miller)  
 
 Bug Fixes
 ----------------------
@@ -193,6 +206,9 @@ Optimizations
 
  * SOLR-5057: QueryResultCache should not related with the order of fq's list (Feihong Huang via Erick Erickson)
 
+* SOLR-4816: CloudSolrServer now uses multiple threads to send updates by default.  
+  (Joel Bernstein via Mark Miller)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java Wed Sep 11 04:03:04 2013
@@ -37,6 +37,9 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Update handler which uses the JavaBin format
@@ -97,7 +100,7 @@ public class JavabinLoader extends Conte
       } catch (EOFException e) {
         break; // this is expected
       }
-      if (update.getDeleteById() != null || update.getDeleteQuery() != null) {
+      if (update.getDeleteByIdMap() != null || update.getDeleteQuery() != null) {
         delete(req, update, processor);
       }
     }
@@ -118,9 +121,17 @@ public class JavabinLoader extends Conte
       delcmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
     }
     
-    if(update.getDeleteById() != null) {
-      for (String s : update.getDeleteById()) {
-        delcmd.id = s;
+    if(update.getDeleteByIdMap() != null) {
+      Set<Entry<String,Map<String,Object>>> entries = update.getDeleteByIdMap().entrySet();
+      for (Entry<String,Map<String,Object>> e : entries) {
+        delcmd.id = e.getKey();
+        Map<String,Object> map = e.getValue();
+        if (map != null) {
+          Long version = (Long) map.get("ver");
+          if (version != null) {
+            delcmd.setVersion(version);
+          }
+        }
         processor.processDelete(delcmd);
         delcmd.clear();
       }

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Wed Sep 11 04:03:04 2013
@@ -17,6 +17,10 @@
 
 package org.apache.solr.update;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.util.BytesRef;
@@ -27,11 +31,6 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
 /**
  *
  */

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Wed Sep 11 04:03:04 2013
@@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExec
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -152,8 +152,9 @@ public class SolrCmdDistributor {
     // finish with the pending requests
     checkResponses(false);
 
-    UpdateRequestExt ureq = new UpdateRequestExt();
+    UpdateRequest ureq = new UpdateRequest();
     ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+
     ureq.setParams(params);
     syncRequest(node, ureq);
   }
@@ -173,7 +174,7 @@ public class SolrCmdDistributor {
     deleteRequest.cmd = clonedCmd;
     deleteRequest.params = params;
 
-    UpdateRequestExt ureq = new UpdateRequestExt();
+    UpdateRequest ureq = new UpdateRequest();
     if (cmd.isDeleteById()) {
       ureq.deleteById(cmd.getId(), cmd.getVersion());
     } else {
@@ -185,7 +186,7 @@ public class SolrCmdDistributor {
     }
   }
 
-  private void syncRequest(Node node, UpdateRequestExt ureq) {
+  private void syncRequest(Node node, UpdateRequest ureq) {
     Request sreq = new Request();
     sreq.node = node;
     sreq.ureq = ureq;
@@ -224,7 +225,7 @@ public class SolrCmdDistributor {
     
     // currently, we dont try to piggy back on outstanding adds or deletes
     
-    UpdateRequestExt ureq = new UpdateRequestExt();
+    UpdateRequest ureq = new UpdateRequest();
     ureq.setParams(params);
     
     addCommit(ureq, cmd);
@@ -265,7 +266,7 @@ public class SolrCmdDistributor {
     flushDeletes(maxBufferedDeletesPerServer);
   }
   
-  void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+  void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
     if (cmd == null) return;
     ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
@@ -281,14 +282,13 @@ public class SolrCmdDistributor {
       List<AddRequest> alist = adds.get(node);
       if (alist == null || alist.size() < limit) continue;
   
-      UpdateRequestExt ureq = new UpdateRequestExt();
+      UpdateRequest ureq = new UpdateRequest();
       
       ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-      
+
       for (AddRequest aReq : alist) {
         AddUpdateCommand cmd = aReq.cmd;
         combinedParams.add(aReq.params);
-       
         ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
       }
       
@@ -315,7 +315,7 @@ public class SolrCmdDistributor {
     for (Node node : nodes) {
       List<DeleteRequest> dlist = deletes.get(node);
       if (dlist == null || dlist.size() < limit) continue;
-      UpdateRequestExt ureq = new UpdateRequestExt();
+      UpdateRequest ureq = new UpdateRequest();
       
       ModifiableSolrParams combinedParams = new ModifiableSolrParams();
       
@@ -354,14 +354,14 @@ public class SolrCmdDistributor {
   
   public static class Request {
     public Node node;
-    UpdateRequestExt ureq;
+    UpdateRequest ureq;
     NamedList<Object> ursp;
     int rspCode;
     public Exception exception;
     int retries;
   }
   
-  void submit(UpdateRequestExt ureq, Node node) {
+  void submit(UpdateRequest ureq, Node node) {
     Request sreq = new Request();
     sreq.node = node;
     sreq.ureq = ureq;

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Wed Sep 11 04:03:04 2013
@@ -17,7 +17,6 @@
 
 package org.apache.solr.update;
 
-import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.request.SolrQueryRequest;
 
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/AliasIntegrationTest.java Wed Sep 11 04:03:04 2013
@@ -153,6 +153,7 @@ public class AliasIntegrationTest extend
     
     // search with new cloud client
     CloudSolrServer cloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+    cloudSolrServer.setParallelUpdates(random().nextBoolean());
     query = new SolrQuery("*:*");
     query.set("collection", "testalias");
     res = cloudSolrServer.query(query);

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Wed Sep 11 04:03:04 2013
@@ -1104,6 +1104,7 @@ public class BasicDistributedZkTest exte
       synchronized(this) {
         try {
           commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+          commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
           commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
           commondCloudSolrServer.getLbServer().setConnectionTimeout(15000);
           commondCloudSolrServer.getLbServer().setSoTimeout(30000);

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BinaryRequestWriter.java Wed Sep 11 04:03:04 2013
@@ -41,7 +41,7 @@ public class BinaryRequestWriter extends
     if (req instanceof UpdateRequest) {
       UpdateRequest updateRequest = (UpdateRequest) req;
       if (isNull(updateRequest.getDocuments()) &&
-              isNull(updateRequest.getDeleteById()) &&
+              isNull(updateRequest.getDeleteByIdMap()) &&
               isNull(updateRequest.getDeleteQuery())
               && (updateRequest.getDocIterator() == null) ) {
         return null;

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Sep 11 04:03:04 2013
@@ -30,18 +30,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -49,7 +59,9 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
 
@@ -58,6 +70,10 @@ import org.apache.zookeeper.KeeperExcept
  * Instances of this class communicate with Zookeeper to discover
  * Solr endpoints for SolrCloud collections, and then use the 
  * {@link LBHttpSolrServer} to issue requests.
+ * 
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
  */
 public class CloudSolrServer extends SolrServer {
   private volatile ZkStateReader zkStateReader;
@@ -65,7 +81,7 @@ public class CloudSolrServer extends Sol
   private int zkConnectTimeout = 10000;
   private int zkClientTimeout = 10000;
   private volatile String defaultCollection;
-  private LBHttpSolrServer lbServer;
+  private final LBHttpSolrServer lbServer;
   private HttpClient myClient;
   Random rand = new Random();
   
@@ -79,8 +95,31 @@ public class CloudSolrServer extends Sol
   private volatile int lastClusterStateHashCode;
   
   private final boolean updatesToLeaders;
+  private boolean parallelUpdates = true;
+  private ExecutorService threadPool = Executors
+      .newCachedThreadPool(new SolrjNamedThreadFactory(
+          "CloudSolrServer ThreadPool"));
+  private String idField = "id";
+  private final Set<String> NON_ROUTABLE_PARAMS;
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<String>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+    
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+    
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+
+
 
-  
   /**
    * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
    * in the form HOST:PORT.
@@ -92,12 +131,13 @@ public class CloudSolrServer extends Sol
       this.updatesToLeaders = true;
   }
   
-  public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException {
+  public CloudSolrServer(String zkHost, boolean updatesToLeaders)
+      throws MalformedURLException {
     this.zkHost = zkHost;
     this.myClient = HttpClientUtil.createClient(null);
     this.lbServer = new LBHttpSolrServer(myClient);
     this.updatesToLeaders = updatesToLeaders;
-}
+  }
 
   /**
    * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -121,10 +161,40 @@ public class CloudSolrServer extends Sol
     this.lbServer = lbServer;
     this.updatesToLeaders = updatesToLeaders;
   }
+  
+  public ResponseParser getParser() {
+    return lbServer.getParser();
+  }
+  
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   * 
+   * @param processor
+   *          Default Response Parser chosen to parse the response if the parser
+   *          were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser processor) {
+    lbServer.setParser(processor);
+  }
 
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
+
+  /**
+   * @param idField the field to route documents on.
+   */
+  public void setIdField(String idField) {
+    this.idField = idField;
+  }
+
+  /**
+   * @return the field that updates are routed on.
+   */
+  public String getIdField() {
+    return idField;
+  }
   
   /** Sets the default collection for request */
   public void setDefaultCollection(String collection) {
@@ -179,18 +249,293 @@ public class CloudSolrServer extends Sol
     }
   }
 
+  public void setParallelUpdates(boolean parallelUpdates) {
+    this.parallelUpdates = parallelUpdates;
+  }
+
+  private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+    UpdateRequest updateRequest = (UpdateRequest) request;
+    ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if(params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for(String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    }
+    if (params == null) {
+      return null;
+    }
+
+    String collection = params.get("collection", defaultCollection);
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+
+    //Check to see if the collection is an alias.
+    Aliases aliases = zkStateReader.getAliases();
+    if(aliases != null) {
+      Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+      if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+        collection = collectionAliases.get(collection);
+      }
+    }
+
+    DocCollection col = clusterState.getCollection(collection);
+
+    DocRouter router = col.getRouter();
+    
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    //Create the URL map, which is keyed on slice name.
+    //The value is a list of URLs for each replica in the slice.
+    //The first value in the list is the leader for the slice.
+    Map<String,List<String>> urlMap = buildUrlMap(col);
+
+    NamedList exceptions = new NamedList();
+    NamedList shardResponses = new NamedList();
+
+    Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+    if (routes == null) {
+      return null;
+    }
+
+    Iterator<Map.Entry<String, LBHttpSolrServer.Req>> it = routes.entrySet().iterator();
+
+    long start = System.nanoTime();
+    if(this.parallelUpdates) {
+      ArrayBlockingQueue<RequestTask> finishedTasks = new ArrayBlockingQueue<RequestTask>(routes.size());
+
+      while (it.hasNext()) {
+        Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
+        String url = entry.getKey();
+        LBHttpSolrServer.Req lbRequest = entry.getValue();
+        threadPool.execute(new RequestTask(url, lbRequest, finishedTasks));
+      }
+
+      while ((shardResponses.size() + exceptions.size()) != routes.size()) {
+        RequestTask requestTask = null;
+        try {
+          requestTask = finishedTasks.take();
+        } catch (Exception e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, e);
+        }
+
+        Exception e = requestTask.getException();
+        if (e != null) {
+          exceptions.add(requestTask.getLeader(), e);
+        } else {
+          shardResponses.add(requestTask.getLeader(), requestTask.getRsp().getResponse());
+        }
+      }
+
+      if(exceptions.size() > 0) {
+        throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+      }
+    } else {
+      while (it.hasNext()) {
+        Map.Entry<String, LBHttpSolrServer.Req> entry = it.next();
+        String url = entry.getKey();
+        LBHttpSolrServer.Req lbRequest = entry.getValue();
+        try{
+          NamedList rsp = lbServer.request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch(Exception e) {
+          throw new SolrServerException(e);
+        }
+      }
+    }
+
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+    
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
+    
+    Set<String> intersection = new HashSet<String>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+    
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      List<String> urlList = new ArrayList<String>();
+      urlList.addAll(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList);
+      try {
+        LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    long end = System.nanoTime();
+
+    RouteResponse rr =  condenseResponse(shardResponses, (long)((end - start)/1000000));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+    Map<String, List<String>> urlMap = new HashMap<String, List<String>>();
+    Collection<Slice> slices = col.getActiveSlices();
+    Iterator<Slice> sliceIterator = slices.iterator();
+    while (sliceIterator.hasNext()) {
+      Slice slice = sliceIterator.next();
+      String name = slice.getName();
+      List<String> urls = new ArrayList<String>();
+      Replica leader = slice.getLeader();
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+      String url = zkProps.getBaseUrl() + "/" + col.getName();
+      urls.add(url);
+      Collection<Replica> replicas = slice.getReplicas();
+      Iterator<Replica> replicaIterator = replicas.iterator();
+      while (replicaIterator.hasNext()) {
+        Replica replica = replicaIterator.next();
+        if (!replica.getNodeName().equals(leader.getNodeName()) &&
+            !replica.getName().equals(leader.getName())) {
+          ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+          String url1 = zkProps1.getBaseUrl() + "/" + col.getName();
+          urls.add(url1);
+        }
+      }
+      urlMap.put(name, urls);
+    }
+    return urlMap;
+  }
+
+  public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+    RouteResponse condensed = new RouteResponse();
+    int status = 0;
+    for(int i=0; i<response.size(); i++) {
+      NamedList shardResponse = (NamedList)response.getVal(i);
+      NamedList header = (NamedList)shardResponse.get("responseHeader");
+      Integer shardStatus = (Integer)header.get("status");
+      int s = shardStatus.intValue();
+      if(s > 0) {
+          status = s;
+      }
+    }
+
+    NamedList cheader = new NamedList();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  class RequestTask implements Runnable {
+
+    private LBHttpSolrServer.Req req;
+    private ArrayBlockingQueue<RequestTask> tasks;
+    private LBHttpSolrServer.Rsp rsp;
+    private String leader;
+    private Exception e;
+
+    public RequestTask(String leader, LBHttpSolrServer.Req req, ArrayBlockingQueue<RequestTask> tasks) {
+      this.req = req;
+      this.tasks = tasks;
+      this.leader = leader;
+    }
+
+    public void run() {
+      try {
+        LBHttpSolrServer lb = new LBHttpSolrServer(myClient);
+        this.rsp = lb.request(req);
+        this.tasks.add(this);
+      } catch (Exception e) {
+        this.e = e;
+        this.tasks.add(this);
+      }
+    }
+
+    public Exception getException() {
+      return e;
+    }
+
+    public String getLeader() {
+      return this.leader;
+    }
+
+    public LBHttpSolrServer.Rsp getRsp() {
+      return rsp;
+    }
+  }
+
+  class RouteResponse extends NamedList {
+    private NamedList routeResponses;
+    private Map<String, LBHttpSolrServer.Req> routes;
+
+    public void setRouteResponses(NamedList routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(Map<String, LBHttpSolrServer.Req> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, LBHttpSolrServer.Req> getRoutes() {
+      return routes;
+    }
+
+  }
+
+  class RouteException extends SolrException {
+
+    private NamedList exceptions;
+    private Map<String, LBHttpSolrServer.Req> routes;
+
+    public RouteException(ErrorCode errorCode, NamedList exceptions, Map<String, LBHttpSolrServer.Req> routes){
+      super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0));
+      this.exceptions = exceptions;
+      this.routes = routes;
+    }
+
+    public NamedList getExceptions() {
+      return exceptions;
+    }
+
+    public Map<String, LBHttpSolrServer.Req> getRoutes() {
+      return this.routes;
+    }
+  }
+
   @Override
   public NamedList<Object> request(SolrRequest request)
       throws SolrServerException, IOException {
     connect();
     
-    // TODO: if you can hash here, you could favor the shard leader
-    
     ClusterState clusterState = zkStateReader.getClusterState();
+
     boolean sendToLeaders = false;
     List<String> replicas = null;
     
-    if (request instanceof IsUpdateRequest && updatesToLeaders) {
+    if (request instanceof IsUpdateRequest) {
+      if(request instanceof UpdateRequest) {
+        NamedList response = directUpdate((AbstractUpdateRequest)request,clusterState);
+        if(response != null) {
+          return response;
+        }
+      }
       sendToLeaders = true;
       replicas = new ArrayList<String>();
     }
@@ -355,6 +700,10 @@ public class CloudSolrServer extends Sol
     if (myClient!=null) {
       myClient.getConnectionManager().shutdown();
     }
+
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
   }
 
   public LBHttpSolrServer getLbServer() {

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Wed Sep 11 04:03:04 2013
@@ -98,14 +98,14 @@ public class HttpSolrServer extends Solr
    * 
    * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
    */
-  protected ResponseParser parser;
+  protected volatile ResponseParser parser;
   
   /**
    * The RequestWriter used to write all requests to Solr
    * 
    * @see org.apache.solr.client.solrj.request.RequestWriter
    */
-  protected RequestWriter requestWriter = new RequestWriter();
+  protected volatile RequestWriter requestWriter = new RequestWriter();
   
   private final HttpClient httpClient;
   

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Wed Sep 11 04:03:04 2013
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.imp
 
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -93,7 +94,8 @@ public class LBHttpSolrServer extends So
   private final AtomicInteger counter = new AtomicInteger(-1);
 
   private static final SolrQuery solrQuery = new SolrQuery("*:*");
-  private final ResponseParser parser;
+  private volatile ResponseParser parser;
+  private volatile RequestWriter requestWriter;
 
   static {
     solrQuery.setRows(0);
@@ -219,11 +221,13 @@ public class LBHttpSolrServer extends So
   }
 
   protected HttpSolrServer makeServer(String server) throws MalformedURLException {
-    return new HttpSolrServer(server, httpClient, parser);
+    HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
+    if (requestWriter != null) {
+      s.setRequestWriter(requestWriter);
+    }
+    return s;
   }
 
-
-
   /**
    * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
    * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
@@ -590,6 +594,18 @@ public class LBHttpSolrServer extends So
     return httpClient;
   }
 
+  public ResponseParser getParser() {
+    return parser;
+  }
+  
+  public void setParser(ResponseParser parser) {
+    this.parser = parser;
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    this.requestWriter = requestWriter;
+  }
+  
   @Override
   protected void finalize() throws Throwable {
     try {
@@ -603,4 +619,5 @@ public class LBHttpSolrServer extends So
   // defaults
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
   private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
+
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Wed Sep 11 04:03:04 2013
@@ -23,6 +23,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -63,11 +66,14 @@ public class JavaBinUpdateRequestCodec {
     if(updateRequest.getDocIterator() != null){
       docIter = updateRequest.getDocIterator();
     }
+    
+    Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
 
     nl.add("params", params);// 0: params
-    nl.add("delById", updateRequest.getDeleteById());
+    nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
     nl.add("delByQ", updateRequest.getDeleteQuery());
     nl.add("docs", docIter);
+    nl.add("docsMap", docMap);
     JavaBinCodec codec = new JavaBinCodec();
     codec.marshal(nl, os);
   }
@@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec {
   public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
     final UpdateRequest updateRequest = new UpdateRequest();
     List<List<NamedList>> doclist;
+    Map<SolrInputDocument,Map<String,Object>>  docMap;
     List<String> delById;
+    Map<String,Long> delByIdMap;
     List<String> delByQ;
     final NamedList[] namedList = new NamedList[1];
     JavaBinCodec codec = new JavaBinCodec() {
@@ -158,9 +166,11 @@ public class JavaBinUpdateRequestCodec {
       }
     }
     delById = (List<String>) namedList[0].get("delById");
+    delByIdMap = (Map<String,Long>) namedList[0].get("delByIdMap");
     delByQ = (List<String>) namedList[0].get("delByQ");
     doclist = (List) namedList[0].get("docs");
-
+    docMap =  (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
+    
     if (doclist != null && !doclist.isEmpty()) {
       List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
       for (Object o : doclist) {
@@ -172,11 +182,29 @@ public class JavaBinUpdateRequestCodec {
       }
       updateRequest.add(solrInputDocs);
     }
+    if (docMap != null && !docMap.isEmpty()) {
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        Map<String,Object> map = entry.getValue();
+        Boolean overwrite = null;
+        Integer commitWithin = null;
+        if (map != null) {
+          overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
+          commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
+        }
+        updateRequest.add(entry.getKey(), commitWithin, overwrite);
+      }
+    }
     if (delById != null) {
       for (String s : delById) {
         updateRequest.deleteById(s);
       }
     }
+    if (delByIdMap != null) {
+      for (Map.Entry<String,Long> entry : delByIdMap.entrySet()) {
+        updateRequest.deleteById(entry.getKey(), entry.getValue());
+      }
+    }
     if (delByQ != null) {
       for (String s : delByQ) {
         updateRequest.deleteByQuery(s);

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java Wed Sep 11 04:03:04 2013
@@ -26,6 +26,7 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.nio.charset.Charset;
 
 /**
@@ -52,7 +53,7 @@ public class RequestWriter {
 
   private boolean isEmpty(UpdateRequest updateRequest) {
     return isNull(updateRequest.getDocuments()) &&
-            isNull(updateRequest.getDeleteById()) &&
+            isNull(updateRequest.getDeleteByIdMap()) &&
             isNull(updateRequest.getDeleteQuery()) &&
             updateRequest.getDocIterator() == null;
   }
@@ -137,4 +138,8 @@ public class RequestWriter {
   protected boolean isNull(List l) {
     return l == null || l.isEmpty();
   }
+  
+  protected boolean isNull(Map l) {
+    return l == null || l.isEmpty();
+  }
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Wed Sep 11 04:03:04 2013
@@ -22,203 +22,411 @@ import java.io.StringWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.LinkedHashMap;
 
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.XML;
 
 /**
  * 
- *
+ * 
  * @since solr 1.3
  */
 public class UpdateRequest extends AbstractUpdateRequest {
   
-  private List<SolrInputDocument> documents = null;
+  public static final String OVERWRITE = "ow";
+  public static final String COMMIT_WITHIN = "cw";
+  private Map<SolrInputDocument,Map<String,Object>> documents = null;
   private Iterator<SolrInputDocument> docIterator = null;
-  private List<String> deleteById = null;
+  private Map<String,Map<String,Object>> deleteById = null;
   private List<String> deleteQuery = null;
-
-  public UpdateRequest()
-  {
-    super( METHOD.POST, "/update" );
+  
+  public UpdateRequest() {
+    super(METHOD.POST, "/update");
   }
-
+  
   public UpdateRequest(String url) {
     super(METHOD.POST, url);
   }
-
-  //---------------------------------------------------------------------------
-  //---------------------------------------------------------------------------
+  
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
   
   /**
    * clear the pending documents and delete commands
    */
-  public void clear()
-  {
-    if( documents != null ) {
+  public void clear() {
+    if (documents != null) {
       documents.clear();
     }
-    if( deleteById != null ) {
+    if (deleteById != null) {
       deleteById.clear();
     }
-    if( deleteQuery != null ) {
+    if (deleteQuery != null) {
       deleteQuery.clear();
     }
   }
   
-  //---------------------------------------------------------------------------
-  //---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+  
+  public UpdateRequest add(final SolrInputDocument doc) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+    }
+    documents.put(doc, null);
+    return this;
+  }
+  
+  public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) {
+    return add(doc, null, overwrite);
+  }
+  
+  public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) {
+    return add(doc, commitWithin, null);
+  }
   
-  public UpdateRequest add( final SolrInputDocument doc )
-  {
-    if( documents == null ) {
-      documents = new ArrayList<SolrInputDocument>( 2 );
+  public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin,
+      Boolean overwrite) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
     }
-    documents.add( doc );
+    Map<String,Object> params = new HashMap<String,Object>(2);
+    if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin);
+    if (overwrite != null) params.put(OVERWRITE, overwrite);
+    
+    documents.put(doc, params);
+    
     return this;
   }
   
-  public UpdateRequest add( final Collection<SolrInputDocument> docs )
-  {
-    if( documents == null ) {
-      documents = new ArrayList<SolrInputDocument>( docs.size()+1 );
+  public UpdateRequest add(final Collection<SolrInputDocument> docs) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+    }
+    for (SolrInputDocument doc : docs) {
+      documents.put(doc, null);
     }
-    documents.addAll( docs );
     return this;
   }
   
-  public UpdateRequest deleteById( String id )
-  {
-    if( deleteById == null ) {
-      deleteById = new ArrayList<String>();
+  public UpdateRequest deleteById(String id) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
     }
-    deleteById.add( id );
+    deleteById.put(id, null);
     return this;
   }
-  public UpdateRequest deleteById( List<String> ids )
-  {
-    if( deleteById == null ) {
-      deleteById = new ArrayList<String>(ids);
-    } else {
-      deleteById.addAll(ids);
+  
+  public UpdateRequest deleteById(List<String> ids) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
     }
+    
+    for (String id : ids) {
+      deleteById.put(id, null);
+    }
+    
     return this;
   }
   
-  public UpdateRequest deleteByQuery( String q )
-  {
-    if( deleteQuery == null ) {
+  public UpdateRequest deleteById(String id, Long version) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
+    }
+    Map<String,Object> params = new HashMap<String,Object>(1);
+    params.put("ver", version);
+    deleteById.put(id, params);
+    return this;
+  }
+  
+  public UpdateRequest deleteByQuery(String q) {
+    if (deleteQuery == null) {
       deleteQuery = new ArrayList<String>();
     }
-    deleteQuery.add( q );
+    deleteQuery.add(q);
     return this;
   }
+  
+  /**
+   * @param router to route updates with
+   * @param col DocCollection for the updates
+   * @param urlMap of the cluster
+   * @param params params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   */
+  public Map<String,LBHttpSolrServer.Req> getRoutes(DocRouter router,
+      DocCollection col, Map<String,List<String>> urlMap,
+      ModifiableSolrParams params, String idField) {
+    
+    if ((documents == null || documents.size() == 0)
+        && (deleteById == null || deleteById.size() == 0)) {
+      return null;
+    }
+    
+    Map<String,LBHttpSolrServer.Req> routes = new HashMap<String,LBHttpSolrServer.Req>();
+    if (documents != null) {
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        SolrInputDocument doc = entry.getKey();
+        Object id = doc.getFieldValue(idField);
+        if (id == null) {
+          return null;
+        }
+        Slice slice = router.getTargetSlice(id
+            .toString(), doc, null, col);
+        if (slice == null) {
+          return null;
+        }
+        List<String> urls = urlMap.get(slice.getName());
+        String leaderUrl = urls.get(0);
+        LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes
+            .get(leaderUrl);
+        if (request == null) {
+          UpdateRequest updateRequest = new UpdateRequest();
+          updateRequest.setMethod(getMethod());
+          updateRequest.setCommitWithin(getCommitWithin());
+          updateRequest.setParams(params);
+          updateRequest.setPath(getPath());
+          request = new LBHttpSolrServer.Req(updateRequest, urls);
+          routes.put(leaderUrl, request);
+        }
+        UpdateRequest urequest = (UpdateRequest) request.getRequest();
+        urequest.add(doc);
+      }
+    }
+    
+    // Route the deleteById's
+    
+    if (deleteById != null) {
+      
+      Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
+          .iterator();
+      while (entries.hasNext()) {
+        
+        Map.Entry<String,Map<String,Object>> entry = entries.next();
+        
+        String deleteId = entry.getKey();
+        Map<String,Object> map = entry.getValue();
+        Long version = null;
+        if (map != null) {
+          version = (Long) map.get("ver");
+        }
+        Slice slice = router.getTargetSlice(deleteId, null, null, col);
+        if (slice == null) {
+          return null;
+        }
+        List<String> urls = urlMap.get(slice.getName());
+        String leaderUrl = urls.get(0);
+        LBHttpSolrServer.Req request = routes.get(leaderUrl);
+        if (request != null) {
+          UpdateRequest urequest = (UpdateRequest) request.getRequest();
+          urequest.deleteById(deleteId, version);
+        } else {
+          UpdateRequest urequest = new UpdateRequest();
+          urequest.deleteById(deleteId, version);
+          request = new LBHttpSolrServer.Req(urequest, urls);
+          routes.put(leaderUrl, request);
+        }
+      }
+    }
 
+    return routes;
+  }
+  
   public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
     this.docIterator = docIterator;
   }
-
-  //--------------------------------------------------------------------------
-  //--------------------------------------------------------------------------
-
+  
+  public void setDeleteQuery(List<String> deleteQuery) {
+    this.deleteQuery = deleteQuery;
+  }
+  
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+  
   @Override
   public Collection<ContentStream> getContentStreams() throws IOException {
-    return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
+    return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
   }
-
+  
   public String getXML() throws IOException {
     StringWriter writer = new StringWriter();
-    writeXML( writer );
+    writeXML(writer);
     writer.flush();
-
+    
     // If action is COMMIT or OPTIMIZE, it is sent with params
     String xml = writer.toString();
-    //System.out.println( "SEND:"+xml );
+    // System.out.println( "SEND:"+xml );
     return (xml.length() > 0) ? xml : null;
   }
   
+  private List<Map<SolrInputDocument,Map<String,Object>>> getDocLists(Map<SolrInputDocument,Map<String,Object>> documents) {
+    List<Map<SolrInputDocument,Map<String,Object>>> docLists = new ArrayList<Map<SolrInputDocument,Map<String,Object>>>();
+    Map<SolrInputDocument,Map<String,Object>> docList = null;
+    if (this.documents != null) {
+      
+      Boolean lastOverwrite = true;
+      Integer lastCommitWithin = -1;
+      
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = this.documents
+          .entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        Map<String,Object> map = entry.getValue();
+        Boolean overwrite = null;
+        Integer commitWithin = null;
+        if (map != null) {
+          overwrite = (Boolean) entry.getValue().get(OVERWRITE);
+          commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
+        }
+        if (overwrite != lastOverwrite || commitWithin != lastCommitWithin
+            || docLists.size() == 0) {
+          docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+          docLists.add(docList);
+        }
+        docList.put(entry.getKey(), entry.getValue());
+        lastCommitWithin = commitWithin;
+        lastOverwrite = overwrite;
+      }
+    }
+    
+    if (docIterator != null) {
+      docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+      docLists.add(docList);
+      while (docIterator.hasNext()) {
+        SolrInputDocument doc = docIterator.next();
+        if (doc != null) {
+          docList.put(doc, null);
+        }
+      }
+      
+    }
+
+    return docLists;
+  }
+  
   /**
    * @since solr 1.4
    */
-  public void writeXML( Writer writer ) throws IOException {
-    if( (documents != null && documents.size() > 0) || docIterator != null) {
-      if( commitWithin > 0 ) {
-        writer.write("<add commitWithin=\""+commitWithin+"\">");
-      }
-      else {
-        writer.write("<add>");
-      }
-      if(documents != null) {
-        for (SolrInputDocument doc : documents) {
-          if (doc != null) {
-            ClientUtils.writeXML(doc, writer);
-          }
+  public void writeXML(Writer writer) throws IOException {
+    List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
+    
+    for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
+      
+      if ((docs != null && docs.size() > 0)) {
+        Entry<SolrInputDocument,Map<String,Object>> firstDoc = docs.entrySet()
+            .iterator().next();
+        Map<String,Object> map = firstDoc.getValue();
+        Integer cw = null;
+        Boolean ow = null;
+        if (map != null) {
+          cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
+          ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
         }
-      }
-      if (docIterator != null) {
-        while (docIterator.hasNext()) {
-          SolrInputDocument doc = docIterator.next();
-          if (doc != null) {
-            ClientUtils.writeXML(doc, writer);
-          }
+        if (ow == null) ow = true;
+        int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin;
+        boolean overwrite = ow;
+        if (commitWithin > -1 || overwrite != true) {
+          writer.write("<add commitWithin=\"" + commitWithin + "\" "
+              + "overwrite=\"" + overwrite + "\">");
+        } else {
+          writer.write("<add>");
+        }
+        
+        Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docs
+            .entrySet();
+        for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+          ClientUtils.writeXML(entry.getKey(), writer);
         }
+        
+        writer.write("</add>");
       }
-      writer.write("</add>");
     }
     
     // Add the delete commands
     boolean deleteI = deleteById != null && deleteById.size() > 0;
     boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
-    if( deleteI || deleteQ ) {
-      if(commitWithin>0) {
-        writer.append( "<delete commitWithin=\"" + commitWithin + "\">" );
+    if (deleteI || deleteQ) {
+      if (commitWithin > 0) {
+        writer.append("<delete commitWithin=\"" + commitWithin + "\">");
       } else {
-        writer.append( "<delete>" );
+        writer.append("<delete>");
       }
-      if( deleteI ) {
-        for( String id : deleteById ) {
-          writer.append( "<id>" );
-          XML.escapeCharData( id, writer );
-          writer.append( "</id>" );
+      if (deleteI) {
+        for (Map.Entry<String,Map<String,Object>> entry : deleteById.entrySet()) {
+          writer.append("<id");
+          Map<String,Object> map = entry.getValue();
+          if (map != null) {
+            Long version = (Long) map.get("ver");
+            if (version != null) {
+              writer.append(" version=\"" + version + "\"");
+            }
+          }
+          writer.append(">");
+          
+          XML.escapeCharData(entry.getKey(), writer);
+          writer.append("</id>");
         }
       }
-      if( deleteQ ) {
-        for( String q : deleteQuery ) {
-          writer.append( "<query>" );
-          XML.escapeCharData( q, writer );
-          writer.append( "</query>" );
+      if (deleteQ) {
+        for (String q : deleteQuery) {
+          writer.append("<query>");
+          XML.escapeCharData(q, writer);
+          writer.append("</query>");
         }
       }
-      writer.append( "</delete>" );
+      writer.append("</delete>");
     }
   }
-
-
-  //--------------------------------------------------------------------------
-  //--------------------------------------------------------------------------
-
-  //--------------------------------------------------------------------------
-  // 
-  //--------------------------------------------------------------------------
-
+  
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+  
+  // --------------------------------------------------------------------------
+  //
+  // --------------------------------------------------------------------------
+  
   public List<SolrInputDocument> getDocuments() {
+    if (documents == null) return null;
+    List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(documents.size());
+    docs.addAll(documents.keySet());
+    return docs;
+  }
+  
+  public Map<SolrInputDocument,Map<String,Object>> getDocumentsMap() {
     return documents;
   }
-
+  
   public Iterator<SolrInputDocument> getDocIterator() {
     return docIterator;
   }
-
+  
   public List<String> getDeleteById() {
+    if (deleteById == null) return null;
+    List<String> deletes = new ArrayList<String>(deleteById.keySet());
+    return deletes;
+  }
+  
+  public Map<String,Map<String,Object>> getDeleteByIdMap() {
     return deleteById;
   }
-
+  
   public List<String> getDeleteQuery() {
     return deleteQuery;
   }
-
+  
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Wed Sep 11 04:03:04 2013
@@ -19,13 +19,22 @@ package org.apache.solr.client.solrj.imp
 
 import java.io.File;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
 import org.apache.solr.cloud.AbstractZkTestCase;
+import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.util.ExternalPaths;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -100,6 +109,92 @@ public class CloudSolrServerTest extends
 
     del("*:*");
 
+    commit();
+    
+    SolrInputDocument doc1 = new SolrInputDocument();
+    doc1.addField(id, "0");
+    doc1.addField("a_t", "hello1");
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField(id, "2");
+    doc2.addField("a_t", "hello2");
+    
+    UpdateRequest request = new UpdateRequest();
+    request.add(doc1);
+    request.add(doc2);
+    request.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
+    
+    // Test single threaded routed updates for UpdateRequest
+    NamedList response = cloudClient.request(request);
+    CloudSolrServer.RouteResponse rr = (CloudSolrServer.RouteResponse) response;
+    Map<String,LBHttpSolrServer.Req> routes = rr.getRoutes();
+    Iterator<Map.Entry<String,LBHttpSolrServer.Req>> it = routes.entrySet()
+        .iterator();
+    while (it.hasNext()) {
+      Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
+      String url = entry.getKey();
+      UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+          .getRequest();
+      SolrInputDocument doc = updateRequest.getDocuments().get(0);
+      String id = doc.getField("id").getValue().toString();
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "id:" + id);
+      params.add("distrib", "false");
+      QueryRequest queryRequest = new QueryRequest(params);
+      HttpSolrServer solrServer = new HttpSolrServer(url);
+      QueryResponse queryResponse = queryRequest.process(solrServer);
+      SolrDocumentList docList = queryResponse.getResults();
+      assertTrue(docList.getNumFound() == 1);
+    }
+    
+    // Test the deleteById routing for UpdateRequest
+    
+    UpdateRequest delRequest = new UpdateRequest();
+    delRequest.deleteById("0");
+    delRequest.deleteById("2");
+    delRequest.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, false);
+    cloudClient.request(delRequest);
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+    qParams.add("q", "*:*");
+    QueryRequest qRequest = new QueryRequest(qParams);
+    QueryResponse qResponse = qRequest.process(cloudClient);
+    SolrDocumentList docs = qResponse.getResults();
+    assertTrue(docs.getNumFound() == 0);
+    
+    // Test Multi-Threaded routed updates for UpdateRequest
+    
+    CloudSolrServer threadedClient = null;
+    try {
+      threadedClient = new CloudSolrServer(zkServer.getZkAddress());
+      threadedClient.setParallelUpdates(true);
+      threadedClient.setDefaultCollection("collection1");
+      response = threadedClient.request(request);
+      rr = (CloudSolrServer.RouteResponse) response;
+      routes = rr.getRoutes();
+      it = routes.entrySet()
+          .iterator();
+      while (it.hasNext()) {
+        Map.Entry<String,LBHttpSolrServer.Req> entry = it.next();
+        String url = entry.getKey();
+        UpdateRequest updateRequest = (UpdateRequest) entry.getValue()
+            .getRequest();
+        SolrInputDocument doc = updateRequest.getDocuments().get(0);
+        String id = doc.getField("id").getValue().toString();
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.add("q", "id:" + id);
+        params.add("distrib", "false");
+        QueryRequest queryRequest = new QueryRequest(params);
+        HttpSolrServer solrServer = new HttpSolrServer(url);
+        QueryResponse queryResponse = queryRequest.process(solrServer);
+        SolrDocumentList docList = queryResponse.getResults();
+        assertTrue(docList.getNumFound() == 1);
+      }
+    } finally {
+      threadedClient.shutdown();
+    }
+    
+    del("*:*");
+    commit();
+    
     indexr(id, 0, "a_t", "to come to the aid of their country.");
     
     CloudJettyRunner shard1Leader = shardToLeaderJetty.get("shard1");

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/test/org/apache/solr/client/solrj/request/TestUpdateRequestCodec.java Wed Sep 11 04:03:04 2013
@@ -16,23 +16,22 @@
  */
 package org.apache.solr.client.solrj.request;
 
-import junit.framework.Assert;
-
-import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.SolrInputField;
-import org.apache.solr.common.util.FastInputStream;
-import org.junit.Test;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.junit.Test;
 
 /**
  * Test for UpdateRequestCodec
@@ -93,7 +92,7 @@ public class TestUpdateRequestCodec exte
     };
 
     UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
-    Assert.assertNull(updateUnmarshalled.getDocuments());
+
     for (SolrInputDocument document : docs) {
       updateUnmarshalled.add(document);
     }
@@ -144,7 +143,7 @@ public class TestUpdateRequestCodec exte
     };
 
     UpdateRequest updateUnmarshalled = codec.unmarshal(new ByteArrayInputStream(baos.toByteArray()) ,handler);
-    Assert.assertNull(updateUnmarshalled.getDocuments());
+ 
     for (SolrInputDocument document : docs) {
       updateUnmarshalled.add(document);
     }

Modified: lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1521726&r1=1521725&r2=1521726&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/branch_4x/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Sep 11 04:03:04 2013
@@ -246,6 +246,7 @@ public abstract class AbstractFullDistri
   protected CloudSolrServer createCloudClient(String defaultCollection)
       throws MalformedURLException {
     CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+    server.setParallelUpdates(random().nextBoolean());
     if (defaultCollection != null) server.setDefaultCollection(defaultCollection);
     server.getLbServer().getHttpClient().getParams()
         .setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 5000);
@@ -1696,6 +1697,7 @@ public abstract class AbstractFullDistri
       synchronized(this) {
         try {
           commondCloudSolrServer = new CloudSolrServer(zkServer.getZkAddress(), random().nextBoolean());
+          commondCloudSolrServer.setParallelUpdates(random().nextBoolean());
           commondCloudSolrServer.setDefaultCollection(DEFAULT_COLLECTION);
           commondCloudSolrServer.connect();
         } catch (MalformedURLException e) {