You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2013/10/21 20:58:44 UTC

svn commit: r1534320 [37/39] - in /lucene/dev/branches/lucene4956: ./ dev-tools/ dev-tools/idea/.idea/ dev-tools/idea/lucene/expressions/ dev-tools/idea/solr/contrib/velocity/ dev-tools/maven/ dev-tools/maven/lucene/ dev-tools/maven/lucene/expressions/...

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Oct 21 18:58:24 2013
@@ -30,18 +30,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -49,7 +62,9 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
 
@@ -58,6 +73,10 @@ import org.apache.zookeeper.KeeperExcept
  * Instances of this class communicate with Zookeeper to discover
  * Solr endpoints for SolrCloud collections, and then use the 
  * {@link LBHttpSolrServer} to issue requests.
+ * 
+ * This class assumes the id field for your documents is called
+ * 'id' - if this is not the case, you must set the right name
+ * with {@link #setIdField(String)}.
  */
 public class CloudSolrServer extends SolrServer {
   private volatile ZkStateReader zkStateReader;
@@ -65,22 +84,37 @@ public class CloudSolrServer extends Sol
   private int zkConnectTimeout = 10000;
   private int zkClientTimeout = 10000;
   private volatile String defaultCollection;
-  private LBHttpSolrServer lbServer;
+  private final LBHttpSolrServer lbServer;
+  private final boolean shutdownLBHttpSolrServer;
   private HttpClient myClient;
   Random rand = new Random();
   
-  private Object cachLock = new Object();
-  // since the state shouldn't change often, should be very cheap reads
-  private Map<String,List<String>> urlLists = new HashMap<String,List<String>>();
-  private Map<String,List<String>> leaderUrlLists = new HashMap<String,List<String>>();
-
-  private Map<String,List<String>> replicasLists = new HashMap<String,List<String>>();
-  
-  private volatile int lastClusterStateHashCode;
-  
   private final boolean updatesToLeaders;
+  private boolean parallelUpdates = true;
+  private ExecutorService threadPool = Executors
+      .newCachedThreadPool(new SolrjNamedThreadFactory(
+          "CloudSolrServer ThreadPool"));
+  private String idField = "id";
+  private final Set<String> NON_ROUTABLE_PARAMS;
+  {
+    NON_ROUTABLE_PARAMS = new HashSet<String>();
+    NON_ROUTABLE_PARAMS.add(UpdateParams.EXPUNGE_DELETES);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.MAX_OPTIMIZE_SEGMENTS);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.WAIT_SEARCHER);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPEN_SEARCHER);
+    
+    NON_ROUTABLE_PARAMS.add(UpdateParams.SOFT_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.PREPARE_COMMIT);
+    NON_ROUTABLE_PARAMS.add(UpdateParams.OPTIMIZE);
+    
+    // Not supported via SolrCloud
+    // NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
+
+  }
+
+
 
-  
   /**
    * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
    * in the form HOST:PORT.
@@ -89,15 +123,22 @@ public class CloudSolrServer extends Sol
       this.zkHost = zkHost;
       this.myClient = HttpClientUtil.createClient(null);
       this.lbServer = new LBHttpSolrServer(myClient);
+      this.lbServer.setRequestWriter(new BinaryRequestWriter());
+      this.lbServer.setParser(new BinaryResponseParser());
       this.updatesToLeaders = true;
+      shutdownLBHttpSolrServer = true;
   }
   
-  public CloudSolrServer(String zkHost, boolean updatesToLeaders) throws MalformedURLException {
+  public CloudSolrServer(String zkHost, boolean updatesToLeaders)
+      throws MalformedURLException {
     this.zkHost = zkHost;
     this.myClient = HttpClientUtil.createClient(null);
     this.lbServer = new LBHttpSolrServer(myClient);
+    this.lbServer.setRequestWriter(new BinaryRequestWriter());
+    this.lbServer.setParser(new BinaryResponseParser());
     this.updatesToLeaders = updatesToLeaders;
-}
+    shutdownLBHttpSolrServer = true;
+  }
 
   /**
    * @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -108,6 +149,7 @@ public class CloudSolrServer extends Sol
     this.zkHost = zkHost;
     this.lbServer = lbServer;
     this.updatesToLeaders = true;
+    shutdownLBHttpSolrServer = false;
   }
   
   /**
@@ -120,11 +162,50 @@ public class CloudSolrServer extends Sol
     this.zkHost = zkHost;
     this.lbServer = lbServer;
     this.updatesToLeaders = updatesToLeaders;
+    shutdownLBHttpSolrServer = false;
+  }
+  
+  public ResponseParser getParser() {
+    return lbServer.getParser();
+  }
+  
+  /**
+   * Note: This setter method is <b>not thread-safe</b>.
+   * 
+   * @param processor
+   *          Default Response Parser chosen to parse the response if the parser
+   *          were not specified as part of the request.
+   * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+   */
+  public void setParser(ResponseParser processor) {
+    lbServer.setParser(processor);
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return lbServer.getRequestWriter();
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    lbServer.setRequestWriter(requestWriter);
   }
 
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
+
+  /**
+   * @param idField the field to route documents on.
+   */
+  public void setIdField(String idField) {
+    this.idField = idField;
+  }
+
+  /**
+   * @return the field that updates are routed on.
+   */
+  public String getIdField() {
+    return idField;
+  }
   
   /** Sets the default collection for request */
   public void setDefaultCollection(String collection) {
@@ -156,8 +237,8 @@ public class CloudSolrServer extends Sol
       synchronized (this) {
         if (zkStateReader == null) {
           try {
-            ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
-                zkClientTimeout);
+            ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout,
+                zkConnectTimeout);
             zk.createClusterStateWatchersAndUpdate();
             zkStateReader = zk;
           } catch (InterruptedException e) {
@@ -179,18 +260,259 @@ public class CloudSolrServer extends Sol
     }
   }
 
+  public void setParallelUpdates(boolean parallelUpdates) {
+    this.parallelUpdates = parallelUpdates;
+  }
+
+  private NamedList directUpdate(AbstractUpdateRequest request, ClusterState clusterState) throws SolrServerException {
+    UpdateRequest updateRequest = (UpdateRequest) request;
+    ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+    ModifiableSolrParams routableParams = new ModifiableSolrParams();
+    ModifiableSolrParams nonRoutableParams = new ModifiableSolrParams();
+
+    if(params != null) {
+      nonRoutableParams.add(params);
+      routableParams.add(params);
+      for(String param : NON_ROUTABLE_PARAMS) {
+        routableParams.remove(param);
+      }
+    }
+
+    String collection = nonRoutableParams.get("collection", defaultCollection);
+    if (collection == null) {
+      throw new SolrServerException("No collection param specified on request and no default collection has been set.");
+    }
+
+
+    //Check to see if the collection is an alias.
+    Aliases aliases = zkStateReader.getAliases();
+    if(aliases != null) {
+      Map<String, String> collectionAliases = aliases.getCollectionAliasMap();
+      if(collectionAliases != null && collectionAliases.containsKey(collection)) {
+        collection = collectionAliases.get(collection);
+      }
+    }
+
+    DocCollection col = clusterState.getCollection(collection);
+
+    DocRouter router = col.getRouter();
+    
+    if (router instanceof ImplicitDocRouter) {
+      // short circuit as optimization
+      return null;
+    }
+
+    //Create the URL map, which is keyed on slice name.
+    //The value is a list of URLs for each replica in the slice.
+    //The first value in the list is the leader for the slice.
+    Map<String,List<String>> urlMap = buildUrlMap(col);
+    if (urlMap == null) {
+      // we could not find a leader yet - use unoptimized general path
+      return null;
+    }
+
+    NamedList exceptions = new NamedList();
+    NamedList shardResponses = new NamedList();
+
+    Map<String, LBHttpSolrServer.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+    if (routes == null) {
+      return null;
+    }
+
+    long start = System.nanoTime();
+
+    if (parallelUpdates) {
+      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<String, Future<NamedList<?>>>(routes.size());
+      for (final Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
+        final String url = entry.getKey();
+        final LBHttpSolrServer.Req lbRequest = entry.getValue();
+        responseFutures.put(url, threadPool.submit(new Callable<NamedList<?>>() {
+          @Override
+          public NamedList<?> call() throws Exception {
+            return lbServer.request(lbRequest).getResponse();
+          }
+        }));
+      }
+
+      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
+        final String url = entry.getKey();
+        final Future<NamedList<?>> responseFuture = entry.getValue();
+        try {
+          shardResponses.add(url, responseFuture.get());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          exceptions.add(url, e.getCause());
+        }
+      }
+
+      if (exceptions.size() > 0) {
+        throw new RouteException(ErrorCode.SERVER_ERROR, exceptions, routes);
+      }
+    } else {
+      for (Map.Entry<String, LBHttpSolrServer.Req> entry : routes.entrySet()) {
+        String url = entry.getKey();
+        LBHttpSolrServer.Req lbRequest = entry.getValue();
+        try {
+          NamedList rsp = lbServer.request(lbRequest).getResponse();
+          shardResponses.add(url, rsp);
+        } catch (Exception e) {
+          throw new SolrServerException(e);
+        }
+      }
+    }
+
+    UpdateRequest nonRoutableRequest = null;
+    List<String> deleteQuery = updateRequest.getDeleteQuery();
+    if (deleteQuery != null && deleteQuery.size() > 0) {
+      UpdateRequest deleteQueryRequest = new UpdateRequest();
+      deleteQueryRequest.setDeleteQuery(deleteQuery);
+      nonRoutableRequest = deleteQueryRequest;
+    }
+    
+    Set<String> paramNames = nonRoutableParams.getParameterNames();
+    
+    Set<String> intersection = new HashSet<String>(paramNames);
+    intersection.retainAll(NON_ROUTABLE_PARAMS);
+    
+    if (nonRoutableRequest != null || intersection.size() > 0) {
+      if (nonRoutableRequest == null) {
+        nonRoutableRequest = new UpdateRequest();
+      }
+      nonRoutableRequest.setParams(nonRoutableParams);
+      List<String> urlList = new ArrayList<String>();
+      urlList.addAll(routes.keySet());
+      Collections.shuffle(urlList, rand);
+      LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(nonRoutableRequest, urlList);
+      try {
+        LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+        shardResponses.add(urlList.get(0), rsp.getResponse());
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, urlList.get(0), e);
+      }
+    }
+
+    long end = System.nanoTime();
+
+    RouteResponse rr =  condenseResponse(shardResponses, (long)((end - start)/1000000));
+    rr.setRouteResponses(shardResponses);
+    rr.setRoutes(routes);
+    return rr;
+  }
+
+  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+    Map<String, List<String>> urlMap = new HashMap<String, List<String>>();
+    Collection<Slice> slices = col.getActiveSlices();
+    Iterator<Slice> sliceIterator = slices.iterator();
+    while (sliceIterator.hasNext()) {
+      Slice slice = sliceIterator.next();
+      String name = slice.getName();
+      List<String> urls = new ArrayList<String>();
+      Replica leader = slice.getLeader();
+      if (leader == null) {
+        // take unoptimized general path - we cannot find a leader yet
+        return null;
+      }
+      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
+      String url = zkProps.getBaseUrl() + "/" + col.getName();
+      urls.add(url);
+      Collection<Replica> replicas = slice.getReplicas();
+      Iterator<Replica> replicaIterator = replicas.iterator();
+      while (replicaIterator.hasNext()) {
+        Replica replica = replicaIterator.next();
+        if (!replica.getNodeName().equals(leader.getNodeName()) &&
+            !replica.getName().equals(leader.getName())) {
+          ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+          String url1 = zkProps1.getBaseUrl() + "/" + col.getName();
+          urls.add(url1);
+        }
+      }
+      urlMap.put(name, urls);
+    }
+    return urlMap;
+  }
+
+  public RouteResponse condenseResponse(NamedList response, long timeMillis) {
+    RouteResponse condensed = new RouteResponse();
+    int status = 0;
+    for(int i=0; i<response.size(); i++) {
+      NamedList shardResponse = (NamedList)response.getVal(i);
+      NamedList header = (NamedList)shardResponse.get("responseHeader");
+      Integer shardStatus = (Integer)header.get("status");
+      int s = shardStatus.intValue();
+      if(s > 0) {
+          status = s;
+      }
+    }
+
+    NamedList cheader = new NamedList();
+    cheader.add("status", status);
+    cheader.add("QTime", timeMillis);
+    condensed.add("responseHeader", cheader);
+    return condensed;
+  }
+
+  class RouteResponse extends NamedList {
+    private NamedList routeResponses;
+    private Map<String, LBHttpSolrServer.Req> routes;
+
+    public void setRouteResponses(NamedList routeResponses) {
+      this.routeResponses = routeResponses;
+    }
+
+    public NamedList getRouteResponses() {
+      return routeResponses;
+    }
+
+    public void setRoutes(Map<String, LBHttpSolrServer.Req> routes) {
+      this.routes = routes;
+    }
+
+    public Map<String, LBHttpSolrServer.Req> getRoutes() {
+      return routes;
+    }
+
+  }
+
+  class RouteException extends SolrException {
+
+    private NamedList exceptions;
+    private Map<String, LBHttpSolrServer.Req> routes;
+
+    public RouteException(ErrorCode errorCode, NamedList exceptions, Map<String, LBHttpSolrServer.Req> routes){
+      super(errorCode, ((Exception)exceptions.getVal(0)).getMessage(), (Exception)exceptions.getVal(0));
+      this.exceptions = exceptions;
+      this.routes = routes;
+    }
+
+    public NamedList getExceptions() {
+      return exceptions;
+    }
+
+    public Map<String, LBHttpSolrServer.Req> getRoutes() {
+      return this.routes;
+    }
+  }
+
   @Override
   public NamedList<Object> request(SolrRequest request)
       throws SolrServerException, IOException {
     connect();
     
-    // TODO: if you can hash here, you could favor the shard leader
-    
     ClusterState clusterState = zkStateReader.getClusterState();
+    
     boolean sendToLeaders = false;
     List<String> replicas = null;
     
-    if (request instanceof IsUpdateRequest && updatesToLeaders) {
+    if (request instanceof IsUpdateRequest) {
+      if (request instanceof UpdateRequest) {
+        NamedList response = directUpdate((AbstractUpdateRequest) request,
+            clusterState);
+        if (response != null) {
+          return response;
+        }
+      }
       sendToLeaders = true;
       replicas = new ArrayList<String>();
     }
@@ -200,13 +522,16 @@ public class CloudSolrServer extends Sol
       reqParams = new ModifiableSolrParams();
     }
     List<String> theUrlList = new ArrayList<String>();
-    if (request.getPath().equals("/admin/collections") || request.getPath().equals("/admin/cores")) {
+    if (request.getPath().equals("/admin/collections")
+        || request.getPath().equals("/admin/cores")) {
       Set<String> liveNodes = clusterState.getLiveNodes();
       for (String liveNode : liveNodes) {
         int splitPointBetweenHostPortAndContext = liveNode.indexOf("_");
         theUrlList.add("http://"
-            + liveNode.substring(0, splitPointBetweenHostPortAndContext) + "/"
-            + URLDecoder.decode(liveNode, "UTF-8").substring(splitPointBetweenHostPortAndContext + 1));
+            + liveNode.substring(0, splitPointBetweenHostPortAndContext)
+            + "/"
+            + URLDecoder.decode(liveNode, "UTF-8").substring(
+                splitPointBetweenHostPortAndContext + 1));
       }
     } else {
       String collection = reqParams.get("collection", defaultCollection);
@@ -218,14 +543,15 @@ public class CloudSolrServer extends Sol
       
       Set<String> collectionsList = getCollectionList(clusterState, collection);
       if (collectionsList.size() == 0) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Could not find collection: " + collection);
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "Could not find collection: " + collection);
       }
       collection = collectionsList.iterator().next();
       
       StringBuilder collectionString = new StringBuilder();
       Iterator<String> it = collectionsList.iterator();
       for (int i = 0; i < collectionsList.size(); i++) {
-        String col = it.next(); 
+        String col = it.next();
         collectionString.append(col);
         if (i < collectionsList.size() - 1) {
           collectionString.append(",");
@@ -240,75 +566,67 @@ public class CloudSolrServer extends Sol
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<String,Slice>();
       for (String collectionName : collectionsList) {
-        Collection<Slice> colSlices = clusterState.getActiveSlices(collectionName);
+        Collection<Slice> colSlices = clusterState
+            .getActiveSlices(collectionName);
         if (colSlices == null) {
-          throw new SolrServerException("Could not find collection:" + collectionName);
+          throw new SolrServerException("Could not find collection:"
+              + collectionName);
         }
         ClientUtils.addSlices(slices, collectionName, colSlices, true);
       }
       Set<String> liveNodes = clusterState.getLiveNodes();
       
-      synchronized (cachLock) {
-        List<String> leaderUrlList = leaderUrlLists.get(collection);
-        List<String> urlList = urlLists.get(collection);
-        List<String> replicasList = replicasLists.get(collection);
-        
-        if ((sendToLeaders && leaderUrlList == null)
-            || (!sendToLeaders && urlList == null)
-            || clusterState.hashCode() != this.lastClusterStateHashCode) {
-          // build a map of unique nodes
-          // TODO: allow filtering by group, role, etc
-          Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
-          List<String> urlList2 = new ArrayList<String>();
-          for (Slice slice : slices.values()) {
-            for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
-              ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
-              String node = coreNodeProps.getNodeName();
-              if (!liveNodes.contains(coreNodeProps.getNodeName())
-                  || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
-              if (nodes.put(node, nodeProps) == null) {
-                if (!sendToLeaders
-                    || (sendToLeaders && coreNodeProps.isLeader())) {
-                  String url = coreNodeProps.getCoreUrl();
-                  urlList2.add(url);
-                } else if (sendToLeaders) {
-                  String url = coreNodeProps.getCoreUrl();
-                  replicas.add(url);
-                }
-              }
+      List<String> leaderUrlList = null;
+      List<String> urlList = null;
+      List<String> replicasList = null;
+      
+      // build a map of unique nodes
+      // TODO: allow filtering by group, role, etc
+      Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
+      List<String> urlList2 = new ArrayList<String>();
+      for (Slice slice : slices.values()) {
+        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
+          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+          String node = coreNodeProps.getNodeName();
+          if (!liveNodes.contains(coreNodeProps.getNodeName())
+              || !coreNodeProps.getState().equals(ZkStateReader.ACTIVE)) continue;
+          if (nodes.put(node, nodeProps) == null) {
+            if (!sendToLeaders || (sendToLeaders && coreNodeProps.isLeader())) {
+              String url = coreNodeProps.getCoreUrl();
+              urlList2.add(url);
+            } else if (sendToLeaders) {
+              String url = coreNodeProps.getCoreUrl();
+              replicas.add(url);
             }
           }
-          
-          if (sendToLeaders) {
-            this.leaderUrlLists.put(collection, urlList2);
-            leaderUrlList = urlList2;
-            this.replicasLists.put(collection, replicas);
-            replicasList = replicas;
-          } else {
-            this.urlLists.put(collection, urlList2);
-            urlList = urlList2;
-          }
-          this.lastClusterStateHashCode = clusterState.hashCode();
-        }
-        
-        if (sendToLeaders) {
-          theUrlList = new ArrayList<String>(leaderUrlList.size());
-          theUrlList.addAll(leaderUrlList);
-        } else {
-          theUrlList = new ArrayList<String>(urlList.size());
-          theUrlList.addAll(urlList);
-        }
-        Collections.shuffle(theUrlList, rand);
-        if (sendToLeaders) {
-          ArrayList<String> theReplicas = new ArrayList<String>(
-              replicasList.size());
-          theReplicas.addAll(replicasList);
-          Collections.shuffle(theReplicas, rand);
-          // System.out.println("leaders:" + theUrlList);
-          // System.out.println("replicas:" + theReplicas);
-          theUrlList.addAll(theReplicas);
         }
       }
+      
+      if (sendToLeaders) {
+        leaderUrlList = urlList2;
+        replicasList = replicas;
+      } else {
+        urlList = urlList2;
+      }
+      
+      if (sendToLeaders) {
+        theUrlList = new ArrayList<String>(leaderUrlList.size());
+        theUrlList.addAll(leaderUrlList);
+      } else {
+        theUrlList = new ArrayList<String>(urlList.size());
+        theUrlList.addAll(urlList);
+      }
+      Collections.shuffle(theUrlList, rand);
+      if (sendToLeaders) {
+        ArrayList<String> theReplicas = new ArrayList<String>(
+            replicasList.size());
+        theReplicas.addAll(replicasList);
+        Collections.shuffle(theReplicas, rand);
+        // System.out.println("leaders:" + theUrlList);
+        // System.out.println("replicas:" + theReplicas);
+        theUrlList.addAll(theReplicas);
+      }
+      
     }
     
     // System.out.println("########################## MAKING REQUEST TO " +
@@ -352,9 +670,18 @@ public class CloudSolrServer extends Sol
         zkStateReader = null;
       }
     }
+    
+    if (shutdownLBHttpSolrServer) {
+      lbServer.shutdown();
+    }
+    
     if (myClient!=null) {
       myClient.getConnectionManager().shutdown();
     }
+
+    if(this.threadPool != null && !this.threadPool.isShutdown()) {
+      this.threadPool.shutdown();
+    }
   }
 
   public LBHttpSolrServer getLbServer() {
@@ -365,19 +692,4 @@ public class CloudSolrServer extends Sol
     return updatesToLeaders;
   }
 
-  // for tests
-  Map<String,List<String>> getUrlLists() {
-    return urlLists;
-  }
-
-  //for tests
-  Map<String,List<String>> getLeaderUrlLists() {
-    return leaderUrlLists;
-  }
-
-  //for tests
-  Map<String,List<String>> getReplicasLists() {
-    return replicasLists;
-  }
-
 }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java Mon Oct 21 18:58:24 2013
@@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -72,14 +74,15 @@ public class ConcurrentUpdateSolrServer 
       .getLogger(ConcurrentUpdateSolrServer.class);
   private HttpSolrServer server;
   final BlockingQueue<UpdateRequest> queue;
-  final ExecutorService scheduler = Executors.newCachedThreadPool(
-      new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
+  final ExecutorService scheduler;
   final Queue<Runner> runners;
   volatile CountDownLatch lock = null; // used to block everything
   final int threadCount;
+  boolean shutdownExecutor = false;
+  int pollQueueTime = 250;
 
   /**
-   * Uses an internaly managed HttpClient instance.
+   * Uses an internally managed HttpClient instance.
    * 
    * @param solrServerUrl
    *          The Solr server URL
@@ -91,18 +94,27 @@ public class ConcurrentUpdateSolrServer 
   public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize,
       int threadCount) {
     this(solrServerUrl, null, queueSize, threadCount);
+    shutdownExecutor = true;
+  }
+  
+  public ConcurrentUpdateSolrServer(String solrServerUrl,
+      HttpClient client, int queueSize, int threadCount) {
+    this(solrServerUrl, null, queueSize, threadCount, Executors.newCachedThreadPool(
+        new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
+    shutdownExecutor = true;
   }
 
   /**
    * Uses the supplied HttpClient to send documents to the Solr server.
    */
   public ConcurrentUpdateSolrServer(String solrServerUrl,
-      HttpClient client, int queueSize, int threadCount) {
+      HttpClient client, int queueSize, int threadCount, ExecutorService es) {
     this.server = new HttpSolrServer(solrServerUrl, client);
     this.server.setFollowRedirects(false);
     queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
     this.threadCount = threadCount;
     runners = new LinkedList<Runner>();
+    scheduler = es;
   }
 
   /**
@@ -115,8 +127,7 @@ public class ConcurrentUpdateSolrServer 
     public void run() {
       runnerLock.lock();
 
-      // info is ok since this should only happen once for each thread
-      log.info("starting runner: {}", this);
+      log.debug("starting runner: {}", this);
       HttpPost method = null;
       HttpResponse response = null;
       try {
@@ -169,14 +180,15 @@ public class ConcurrentUpdateSolrServer 
                       }
                     }
                     out.flush();
-                    req = queue.poll(250, TimeUnit.MILLISECONDS);
+                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
                   }
                   if (isXml) {
                     out.write("</stream>".getBytes("UTF-8"));
                   }
 
                 } catch (InterruptedException e) {
-                  e.printStackTrace();
+                  Thread.currentThread().interrupt();
+                  log.warn("", e);
                 }
               }
             });
@@ -196,16 +208,13 @@ public class ConcurrentUpdateSolrServer 
             
             response = server.getHttpClient().execute(method);
             int statusCode = response.getStatusLine().getStatusCode();
-            log.info("Status for: "
-                + updateRequest.getDocuments().get(0).getFieldValue("id")
-                + " is " + statusCode);
             if (statusCode != HttpStatus.SC_OK) {
               StringBuilder msg = new StringBuilder();
               msg.append(response.getStatusLine().getReasonPhrase());
               msg.append("\n\n");
               msg.append("\n\n");
               msg.append("request: ").append(method.getURI());
-              handleError(new Exception(msg.toString()));
+              handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
             }
           } finally {
             try {
@@ -213,6 +222,7 @@ public class ConcurrentUpdateSolrServer 
                 response.getEntity().getContent().close();
               }
             } catch (Exception ex) {
+              log.warn("", ex);
             }
           }
         }
@@ -236,7 +246,7 @@ public class ConcurrentUpdateSolrServer 
           }
         }
 
-        log.info("finished: {}", this);
+        log.debug("finished: {}", this);
         runnerLock.unlock();
       }
     }
@@ -357,16 +367,18 @@ public class ConcurrentUpdateSolrServer 
   @Override
   public void shutdown() {
     server.shutdown();
-    scheduler.shutdown();
-    try {
-      if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+    if (shutdownExecutor) {
+      scheduler.shutdown();
+      try {
+        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+          scheduler.shutdownNow();
+          if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+              .error("ExecutorService did not terminate");
+        }
+      } catch (InterruptedException ie) {
         scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS))
-          log.error("ExecutorService did not terminate");
+        Thread.currentThread().interrupt();
       }
-    } catch (InterruptedException ie) {
-      scheduler.shutdownNow();
-      Thread.currentThread().interrupt();
     }
   }
   
@@ -384,19 +396,30 @@ public class ConcurrentUpdateSolrServer 
 
   public void shutdownNow() {
     server.shutdown();
-    scheduler.shutdownNow(); // Cancel currently executing tasks
-    try {
-      if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
-        log.error("ExecutorService did not terminate");
-    } catch (InterruptedException ie) {
-      scheduler.shutdownNow();
-      Thread.currentThread().interrupt();
+    if (shutdownExecutor) {
+      scheduler.shutdownNow(); // Cancel currently executing tasks
+      try {
+        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+            .error("ExecutorService did not terminate");
+      } catch (InterruptedException ie) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
   public void setParser(ResponseParser responseParser) {
     server.setParser(responseParser);
   }
+  
+  
+  /**
+   * @param pollQueueTime time for an open connection to wait for updates when
+   * the queue is empty. 
+   */
+  public void setPollQueueTime(int pollQueueTime) {
+    this.pollQueueTime = pollQueueTime;
+  }
 
   public void setRequestWriter(RequestWriter requestWriter) {
     server.setRequestWriter(requestWriter);

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java Mon Oct 21 18:58:24 2013
@@ -100,7 +100,9 @@ public class HttpClientUtil {
    */
   public static HttpClient createClient(final SolrParams params) {
     final ModifiableSolrParams config = new ModifiableSolrParams(params);
-    logger.info("Creating new http client, config:" + config);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Creating new http client, config:" + config);
+    }
     final DefaultHttpClient httpClient = new SystemDefaultHttpClient();
     configureClient(httpClient, config);
     return httpClient;

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrServer.java Mon Oct 21 18:58:24 2013
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -98,14 +99,14 @@ public class HttpSolrServer extends Solr
    * 
    * @see org.apache.solr.client.solrj.impl.BinaryResponseParser
    */
-  protected ResponseParser parser;
+  protected volatile ResponseParser parser;
   
   /**
    * The RequestWriter used to write all requests to Solr
    * 
    * @see org.apache.solr.client.solrj.request.RequestWriter
    */
-  protected RequestWriter requestWriter = new RequestWriter();
+  protected volatile RequestWriter requestWriter = new RequestWriter();
   
   private final HttpClient httpClient;
   
@@ -265,7 +266,7 @@ public class HttpSolrServer extends Solr
                 for (ContentStream content : streams) {
                   String contentType = content.getContentType();
                   if(contentType==null) {
-                    contentType = "application/octet-stream"; // default
+                    contentType = BinaryResponseParser.BINARY_CONTENT_TYPE; // default
                   }
                   String name = content.getName();
                   if(name==null) {
@@ -359,7 +360,7 @@ public class HttpSolrServer extends Solr
     
     InputStream respBody = null;
     boolean shouldClose = true;
-    
+    boolean success = false;
     try {
       // Execute the method.
       final HttpResponse response = httpClient.execute(method);
@@ -367,6 +368,13 @@ public class HttpSolrServer extends Solr
       
       // Read the contents
       respBody = response.getEntity().getContent();
+      Header ctHeader = response.getLastHeader("content-type");
+      String contentType;
+      if (ctHeader != null) {
+        contentType = ctHeader.getValue();
+      } else {
+        contentType = "";
+      }
       
       // handle some http level checks before trying to parse the response
       switch (httpStatus) {
@@ -382,19 +390,46 @@ public class HttpSolrServer extends Solr
           }
           break;
         default:
-          throw new RemoteSolrException(httpStatus, "Server at " + getBaseURL()
-              + " returned non ok status:" + httpStatus + ", message:"
-              + response.getStatusLine().getReasonPhrase(), null);
+          if (processor == null) {
+            throw new RemoteSolrException(httpStatus, "Server at "
+                + getBaseURL() + " returned non ok status:" + httpStatus
+                + ", message:" + response.getStatusLine().getReasonPhrase(),
+                null);
+          }
       }
       if (processor == null) {
+        
         // no processor specified, return raw stream
         NamedList<Object> rsp = new NamedList<Object>();
         rsp.add("stream", respBody);
         // Only case where stream should not be closed
         shouldClose = false;
+        success = true;
         return rsp;
       }
       
+      String procCt = processor.getContentType();
+      if (procCt != null) {
+        if (!contentType.equals(procCt)) {
+          // unexpected content type
+          String msg = "Expected content type " + procCt + " but got " + contentType + ".";
+          Header encodingHeader = response.getEntity().getContentEncoding();
+          String encoding;
+          if (encodingHeader != null) {
+            encoding = encodingHeader.getValue();
+          } else {
+            encoding = "UTF-8"; // try UTF-8
+          }
+          try {
+            msg = msg + " " + IOUtils.toString(respBody, encoding);
+          } catch (IOException e) {
+            throw new RemoteSolrException(httpStatus, "Could not parse response with encoding " + encoding, e);
+          }
+          RemoteSolrException e = new RemoteSolrException(httpStatus, msg, null);
+          throw e;
+        }
+      }
+      
 //      if(true) {
 //        ByteArrayOutputStream copy = new ByteArrayOutputStream();
 //        IOUtils.copy(respBody, copy);
@@ -403,8 +438,13 @@ public class HttpSolrServer extends Solr
 //        respBody = new ByteArrayInputStream(copy.toByteArray());
 //      }
       
+      NamedList<Object> rsp = null;
       String charset = EntityUtils.getContentCharSet(response.getEntity());
-      NamedList<Object> rsp = processor.processResponse(respBody, charset);
+      try {
+        rsp = processor.processResponse(respBody, charset);
+      } catch (Exception e) {
+        throw new RemoteSolrException(httpStatus, e.getMessage(), e);
+      }
       if (httpStatus != HttpStatus.SC_OK) {
         String reason = null;
         try {
@@ -423,6 +463,7 @@ public class HttpSolrServer extends Solr
         }
         throw new RemoteSolrException(httpStatus, reason, null);
       }
+      success = true;
       return rsp;
     } catch (ConnectException e) {
       throw new SolrServerException("Server refused connection at: "
@@ -439,6 +480,9 @@ public class HttpSolrServer extends Solr
         try {
           respBody.close();
         } catch (Throwable t) {} // ignore
+        if (!success) {
+          method.abort();
+        }
       }
     }
   }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Mon Oct 21 18:58:24 2013
@@ -18,6 +18,7 @@ package org.apache.solr.client.solrj.imp
 
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.*;
+import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -93,7 +94,8 @@ public class LBHttpSolrServer extends So
   private final AtomicInteger counter = new AtomicInteger(-1);
 
   private static final SolrQuery solrQuery = new SolrQuery("*:*");
-  private final ResponseParser parser;
+  private volatile ResponseParser parser;
+  private volatile RequestWriter requestWriter;
 
   static {
     solrQuery.setRows(0);
@@ -219,11 +221,13 @@ public class LBHttpSolrServer extends So
   }
 
   protected HttpSolrServer makeServer(String server) throws MalformedURLException {
-    return new HttpSolrServer(server, httpClient, parser);
+    HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
+    if (requestWriter != null) {
+      s.setRequestWriter(requestWriter);
+    }
+    return s;
   }
 
-
-
   /**
    * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
    * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
@@ -590,6 +594,22 @@ public class LBHttpSolrServer extends So
     return httpClient;
   }
 
+  public ResponseParser getParser() {
+    return parser;
+  }
+  
+  public void setParser(ResponseParser parser) {
+    this.parser = parser;
+  }
+  
+  public void setRequestWriter(RequestWriter requestWriter) {
+    this.requestWriter = requestWriter;
+  }
+  
+  public RequestWriter getRequestWriter() {
+    return requestWriter;
+  }
+  
   @Override
   protected void finalize() throws Throwable {
     try {
@@ -603,4 +623,5 @@ public class LBHttpSolrServer extends So
   // defaults
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
   private static final int NONSTANDARD_PING_LIMIT = 5;  // number of times we'll ping dead servers not in the server list
+
 }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/impl/XMLResponseParser.java Mon Oct 21 18:58:24 2013
@@ -32,6 +32,7 @@ import javax.xml.stream.XMLInputFactory;
 import javax.xml.stream.XMLStreamConstants;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
+
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.ArrayList;
@@ -46,6 +47,7 @@ import java.util.Locale;
  */
 public class XMLResponseParser extends ResponseParser
 {
+  public static final String XML_CONTENT_TYPE = "application/xml; charset=UTF-8";
   public static Logger log = LoggerFactory.getLogger(XMLResponseParser.class);
   private static final XMLErrorLogger xmllog = new XMLErrorLogger(log);
 
@@ -78,6 +80,11 @@ public class XMLResponseParser extends R
   {
     return "xml";
   }
+  
+  @Override
+  public String getContentType() {
+    return XML_CONTENT_TYPE;
+  }
 
   @Override
   public NamedList<Object> processResponse(Reader in) {

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Mon Oct 21 18:58:24 2013
@@ -59,6 +59,7 @@ public class CoreAdminRequest extends So
     private String coreNodeName;
     private Boolean loadOnStartup;
     private Boolean isTransient;
+    private String collectionConfigName;
 
     public Create() {
       action = CoreAdminAction.CREATE;
@@ -76,6 +77,7 @@ public class CoreAdminRequest extends So
     public void setCoreNodeName(String coreNodeName) {this.coreNodeName = coreNodeName;}
     public void setIsTransient(Boolean isTransient) { this.isTransient = isTransient; }
     public void setIsLoadOnStartup(Boolean loadOnStartup) { this.loadOnStartup = loadOnStartup;}
+    public void setCollectionConfigName(String name) { this.collectionConfigName = name;}
 
     public String getInstanceDir() { return instanceDir; }
     public String getSchemaName()  { return schemaName; }
@@ -88,7 +90,8 @@ public class CoreAdminRequest extends So
     public String getCoreNodeName() { return coreNodeName; }
     public Boolean getIsLoadOnStartup() { return loadOnStartup; }
     public Boolean getIsTransient() { return isTransient; }
-
+    public String getCollectionConfigName() { return collectionConfigName;}
+    
     @Override
     public SolrParams getParams() {
       if( action == null ) {
@@ -137,6 +140,11 @@ public class CoreAdminRequest extends So
       if (loadOnStartup != null) {
         params.set(CoreAdminParams.LOAD_ON_STARTUP, loadOnStartup);
       }
+      
+      if (collectionConfigName != null) {
+        params.set("collection.configName", collectionConfigName);
+      }
+      
       return params;
     }
 

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/JavaBinUpdateRequestCodec.java Mon Oct 21 18:58:24 2013
@@ -23,6 +23,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -63,11 +66,14 @@ public class JavaBinUpdateRequestCodec {
     if(updateRequest.getDocIterator() != null){
       docIter = updateRequest.getDocIterator();
     }
+    
+    Map<SolrInputDocument,Map<String,Object>> docMap = updateRequest.getDocumentsMap();
 
     nl.add("params", params);// 0: params
-    nl.add("delById", updateRequest.getDeleteById());
+    nl.add("delByIdMap", updateRequest.getDeleteByIdMap());
     nl.add("delByQ", updateRequest.getDeleteQuery());
     nl.add("docs", docIter);
+    nl.add("docsMap", docMap);
     JavaBinCodec codec = new JavaBinCodec();
     codec.marshal(nl, os);
   }
@@ -86,7 +92,9 @@ public class JavaBinUpdateRequestCodec {
   public UpdateRequest unmarshal(InputStream is, final StreamingUpdateHandler handler) throws IOException {
     final UpdateRequest updateRequest = new UpdateRequest();
     List<List<NamedList>> doclist;
+    Map<SolrInputDocument,Map<String,Object>>  docMap;
     List<String> delById;
+    Map<String,Map<String,Object>> delByIdMap;
     List<String> delByQ;
     final NamedList[] namedList = new NamedList[1];
     JavaBinCodec codec = new JavaBinCodec() {
@@ -158,9 +166,11 @@ public class JavaBinUpdateRequestCodec {
       }
     }
     delById = (List<String>) namedList[0].get("delById");
+    delByIdMap = (Map<String,Map<String,Object>>) namedList[0].get("delByIdMap");
     delByQ = (List<String>) namedList[0].get("delByQ");
     doclist = (List) namedList[0].get("docs");
-
+    docMap =  (Map<SolrInputDocument,Map<String,Object>>) namedList[0].get("docsMap");
+    
     if (doclist != null && !doclist.isEmpty()) {
       List<SolrInputDocument> solrInputDocs = new ArrayList<SolrInputDocument>();
       for (Object o : doclist) {
@@ -172,11 +182,36 @@ public class JavaBinUpdateRequestCodec {
       }
       updateRequest.add(solrInputDocs);
     }
+    if (docMap != null && !docMap.isEmpty()) {
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docMap.entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        Map<String,Object> map = entry.getValue();
+        Boolean overwrite = null;
+        Integer commitWithin = null;
+        if (map != null) {
+          overwrite = (Boolean) map.get(UpdateRequest.OVERWRITE);
+          commitWithin = (Integer) map.get(UpdateRequest.COMMIT_WITHIN);
+        }
+        updateRequest.add(entry.getKey(), commitWithin, overwrite);
+      }
+    }
     if (delById != null) {
       for (String s : delById) {
         updateRequest.deleteById(s);
       }
     }
+    if (delByIdMap != null) {
+      for (Map.Entry<String,Map<String,Object>> entry : delByIdMap.entrySet()) {
+        Map<String,Object> params = entry.getValue();
+        if (params != null) {
+          Long version = (Long) params.get("ver");
+          updateRequest.deleteById(entry.getKey(), version);
+        } else {
+          updateRequest.deleteById(entry.getKey());
+        }
+  
+      }
+    }
     if (delByQ != null) {
       for (String s : delByQ) {
         updateRequest.deleteByQuery(s);

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/RequestWriter.java Mon Oct 21 18:58:24 2013
@@ -26,6 +26,7 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.nio.charset.Charset;
 
 /**
@@ -52,7 +53,7 @@ public class RequestWriter {
 
   private boolean isEmpty(UpdateRequest updateRequest) {
     return isNull(updateRequest.getDocuments()) &&
-            isNull(updateRequest.getDeleteById()) &&
+            isNull(updateRequest.getDeleteByIdMap()) &&
             isNull(updateRequest.getDeleteQuery()) &&
             updateRequest.getDocIterator() == null;
   }
@@ -137,4 +138,8 @@ public class RequestWriter {
   protected boolean isNull(List l) {
     return l == null || l.isEmpty();
   }
+  
+  protected boolean isNull(Map l) {
+    return l == null || l.isEmpty();
+  }
 }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java Mon Oct 21 18:58:24 2013
@@ -22,203 +22,412 @@ import java.io.StringWriter;
 import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.LinkedHashMap;
 
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.XML;
 
 /**
  * 
- *
+ * 
  * @since solr 1.3
  */
 public class UpdateRequest extends AbstractUpdateRequest {
   
-  private List<SolrInputDocument> documents = null;
+  private static final String VER = "ver";
+  public static final String OVERWRITE = "ow";
+  public static final String COMMIT_WITHIN = "cw";
+  private Map<SolrInputDocument,Map<String,Object>> documents = null;
   private Iterator<SolrInputDocument> docIterator = null;
-  private List<String> deleteById = null;
+  private Map<String,Map<String,Object>> deleteById = null;
   private List<String> deleteQuery = null;
-
-  public UpdateRequest()
-  {
-    super( METHOD.POST, "/update" );
+  
+  public UpdateRequest() {
+    super(METHOD.POST, "/update");
   }
-
+  
   public UpdateRequest(String url) {
     super(METHOD.POST, url);
   }
-
-  //---------------------------------------------------------------------------
-  //---------------------------------------------------------------------------
+  
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
   
   /**
    * clear the pending documents and delete commands
    */
-  public void clear()
-  {
-    if( documents != null ) {
+  public void clear() {
+    if (documents != null) {
       documents.clear();
     }
-    if( deleteById != null ) {
+    if (deleteById != null) {
       deleteById.clear();
     }
-    if( deleteQuery != null ) {
+    if (deleteQuery != null) {
       deleteQuery.clear();
     }
   }
   
-  //---------------------------------------------------------------------------
-  //---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+  
+  public UpdateRequest add(final SolrInputDocument doc) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+    }
+    documents.put(doc, null);
+    return this;
+  }
+  
+  public UpdateRequest add(final SolrInputDocument doc, Boolean overwrite) {
+    return add(doc, null, overwrite);
+  }
+  
+  public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin) {
+    return add(doc, commitWithin, null);
+  }
   
-  public UpdateRequest add( final SolrInputDocument doc )
-  {
-    if( documents == null ) {
-      documents = new ArrayList<SolrInputDocument>( 2 );
+  public UpdateRequest add(final SolrInputDocument doc, Integer commitWithin,
+      Boolean overwrite) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
     }
-    documents.add( doc );
+    Map<String,Object> params = new HashMap<String,Object>(2);
+    if (commitWithin != null) params.put(COMMIT_WITHIN, commitWithin);
+    if (overwrite != null) params.put(OVERWRITE, overwrite);
+    
+    documents.put(doc, params);
+    
     return this;
   }
   
-  public UpdateRequest add( final Collection<SolrInputDocument> docs )
-  {
-    if( documents == null ) {
-      documents = new ArrayList<SolrInputDocument>( docs.size()+1 );
+  public UpdateRequest add(final Collection<SolrInputDocument> docs) {
+    if (documents == null) {
+      documents = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+    }
+    for (SolrInputDocument doc : docs) {
+      documents.put(doc, null);
     }
-    documents.addAll( docs );
     return this;
   }
   
-  public UpdateRequest deleteById( String id )
-  {
-    if( deleteById == null ) {
-      deleteById = new ArrayList<String>();
+  public UpdateRequest deleteById(String id) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
     }
-    deleteById.add( id );
+    deleteById.put(id, null);
     return this;
   }
-  public UpdateRequest deleteById( List<String> ids )
-  {
-    if( deleteById == null ) {
-      deleteById = new ArrayList<String>(ids);
-    } else {
-      deleteById.addAll(ids);
+  
+  public UpdateRequest deleteById(List<String> ids) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
     }
+    
+    for (String id : ids) {
+      deleteById.put(id, null);
+    }
+    
     return this;
   }
   
-  public UpdateRequest deleteByQuery( String q )
-  {
-    if( deleteQuery == null ) {
+  public UpdateRequest deleteById(String id, Long version) {
+    if (deleteById == null) {
+      deleteById = new LinkedHashMap<String,Map<String,Object>>();
+    }
+    Map<String,Object> params = new HashMap<String,Object>(1);
+    params.put(VER, version);
+    deleteById.put(id, params);
+    return this;
+  }
+  
+  public UpdateRequest deleteByQuery(String q) {
+    if (deleteQuery == null) {
       deleteQuery = new ArrayList<String>();
     }
-    deleteQuery.add( q );
+    deleteQuery.add(q);
     return this;
   }
+  
+  /**
+   * @param router to route updates with
+   * @param col DocCollection for the updates
+   * @param urlMap of the cluster
+   * @param params params to use
+   * @param idField the id field
+   * @return a Map of urls to requests
+   */
+  public Map<String,LBHttpSolrServer.Req> getRoutes(DocRouter router,
+      DocCollection col, Map<String,List<String>> urlMap,
+      ModifiableSolrParams params, String idField) {
+    
+    if ((documents == null || documents.size() == 0)
+        && (deleteById == null || deleteById.size() == 0)) {
+      return null;
+    }
+    
+    Map<String,LBHttpSolrServer.Req> routes = new HashMap<String,LBHttpSolrServer.Req>();
+    if (documents != null) {
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = documents.entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        SolrInputDocument doc = entry.getKey();
+        Object id = doc.getFieldValue(idField);
+        if (id == null) {
+          return null;
+        }
+        Slice slice = router.getTargetSlice(id
+            .toString(), doc, null, col);
+        if (slice == null) {
+          return null;
+        }
+        List<String> urls = urlMap.get(slice.getName());
+        String leaderUrl = urls.get(0);
+        LBHttpSolrServer.Req request = (LBHttpSolrServer.Req) routes
+            .get(leaderUrl);
+        if (request == null) {
+          UpdateRequest updateRequest = new UpdateRequest();
+          updateRequest.setMethod(getMethod());
+          updateRequest.setCommitWithin(getCommitWithin());
+          updateRequest.setParams(params);
+          updateRequest.setPath(getPath());
+          request = new LBHttpSolrServer.Req(updateRequest, urls);
+          routes.put(leaderUrl, request);
+        }
+        UpdateRequest urequest = (UpdateRequest) request.getRequest();
+        urequest.add(doc);
+      }
+    }
+    
+    // Route the deleteById's
+    
+    if (deleteById != null) {
+      
+      Iterator<Map.Entry<String,Map<String,Object>>> entries = deleteById.entrySet()
+          .iterator();
+      while (entries.hasNext()) {
+        
+        Map.Entry<String,Map<String,Object>> entry = entries.next();
+        
+        String deleteId = entry.getKey();
+        Map<String,Object> map = entry.getValue();
+        Long version = null;
+        if (map != null) {
+          version = (Long) map.get(VER);
+        }
+        Slice slice = router.getTargetSlice(deleteId, null, null, col);
+        if (slice == null) {
+          return null;
+        }
+        List<String> urls = urlMap.get(slice.getName());
+        String leaderUrl = urls.get(0);
+        LBHttpSolrServer.Req request = routes.get(leaderUrl);
+        if (request != null) {
+          UpdateRequest urequest = (UpdateRequest) request.getRequest();
+          urequest.deleteById(deleteId, version);
+        } else {
+          UpdateRequest urequest = new UpdateRequest();
+          urequest.deleteById(deleteId, version);
+          request = new LBHttpSolrServer.Req(urequest, urls);
+          routes.put(leaderUrl, request);
+        }
+      }
+    }
 
+    return routes;
+  }
+  
   public void setDocIterator(Iterator<SolrInputDocument> docIterator) {
     this.docIterator = docIterator;
   }
-
-  //--------------------------------------------------------------------------
-  //--------------------------------------------------------------------------
-
+  
+  public void setDeleteQuery(List<String> deleteQuery) {
+    this.deleteQuery = deleteQuery;
+  }
+  
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+  
   @Override
   public Collection<ContentStream> getContentStreams() throws IOException {
-    return ClientUtils.toContentStreams( getXML(), ClientUtils.TEXT_XML );
+    return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
   }
-
+  
   public String getXML() throws IOException {
     StringWriter writer = new StringWriter();
-    writeXML( writer );
+    writeXML(writer);
     writer.flush();
-
+    
     // If action is COMMIT or OPTIMIZE, it is sent with params
     String xml = writer.toString();
-    //System.out.println( "SEND:"+xml );
+    // System.out.println( "SEND:"+xml );
     return (xml.length() > 0) ? xml : null;
   }
   
+  private List<Map<SolrInputDocument,Map<String,Object>>> getDocLists(Map<SolrInputDocument,Map<String,Object>> documents) {
+    List<Map<SolrInputDocument,Map<String,Object>>> docLists = new ArrayList<Map<SolrInputDocument,Map<String,Object>>>();
+    Map<SolrInputDocument,Map<String,Object>> docList = null;
+    if (this.documents != null) {
+      
+      Boolean lastOverwrite = true;
+      Integer lastCommitWithin = -1;
+      
+      Set<Entry<SolrInputDocument,Map<String,Object>>> entries = this.documents
+          .entrySet();
+      for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+        Map<String,Object> map = entry.getValue();
+        Boolean overwrite = null;
+        Integer commitWithin = null;
+        if (map != null) {
+          overwrite = (Boolean) entry.getValue().get(OVERWRITE);
+          commitWithin = (Integer) entry.getValue().get(COMMIT_WITHIN);
+        }
+        if (overwrite != lastOverwrite || commitWithin != lastCommitWithin
+            || docLists.size() == 0) {
+          docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+          docLists.add(docList);
+        }
+        docList.put(entry.getKey(), entry.getValue());
+        lastCommitWithin = commitWithin;
+        lastOverwrite = overwrite;
+      }
+    }
+    
+    if (docIterator != null) {
+      docList = new LinkedHashMap<SolrInputDocument,Map<String,Object>>();
+      docLists.add(docList);
+      while (docIterator.hasNext()) {
+        SolrInputDocument doc = docIterator.next();
+        if (doc != null) {
+          docList.put(doc, null);
+        }
+      }
+      
+    }
+
+    return docLists;
+  }
+  
   /**
    * @since solr 1.4
    */
-  public void writeXML( Writer writer ) throws IOException {
-    if( (documents != null && documents.size() > 0) || docIterator != null) {
-      if( commitWithin > 0 ) {
-        writer.write("<add commitWithin=\""+commitWithin+"\">");
-      }
-      else {
-        writer.write("<add>");
-      }
-      if(documents != null) {
-        for (SolrInputDocument doc : documents) {
-          if (doc != null) {
-            ClientUtils.writeXML(doc, writer);
-          }
+  public void writeXML(Writer writer) throws IOException {
+    List<Map<SolrInputDocument,Map<String,Object>>> getDocLists = getDocLists(documents);
+    
+    for (Map<SolrInputDocument,Map<String,Object>> docs : getDocLists) {
+      
+      if ((docs != null && docs.size() > 0)) {
+        Entry<SolrInputDocument,Map<String,Object>> firstDoc = docs.entrySet()
+            .iterator().next();
+        Map<String,Object> map = firstDoc.getValue();
+        Integer cw = null;
+        Boolean ow = null;
+        if (map != null) {
+          cw = (Integer) firstDoc.getValue().get(COMMIT_WITHIN);
+          ow = (Boolean) firstDoc.getValue().get(OVERWRITE);
         }
-      }
-      if (docIterator != null) {
-        while (docIterator.hasNext()) {
-          SolrInputDocument doc = docIterator.next();
-          if (doc != null) {
-            ClientUtils.writeXML(doc, writer);
-          }
+        if (ow == null) ow = true;
+        int commitWithin = (cw != null && cw != -1) ? cw : this.commitWithin;
+        boolean overwrite = ow;
+        if (commitWithin > -1 || overwrite != true) {
+          writer.write("<add commitWithin=\"" + commitWithin + "\" "
+              + "overwrite=\"" + overwrite + "\">");
+        } else {
+          writer.write("<add>");
+        }
+        
+        Set<Entry<SolrInputDocument,Map<String,Object>>> entries = docs
+            .entrySet();
+        for (Entry<SolrInputDocument,Map<String,Object>> entry : entries) {
+          ClientUtils.writeXML(entry.getKey(), writer);
         }
+        
+        writer.write("</add>");
       }
-      writer.write("</add>");
     }
     
     // Add the delete commands
     boolean deleteI = deleteById != null && deleteById.size() > 0;
     boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
-    if( deleteI || deleteQ ) {
-      if(commitWithin>0) {
-        writer.append( "<delete commitWithin=\"" + commitWithin + "\">" );
+    if (deleteI || deleteQ) {
+      if (commitWithin > 0) {
+        writer.append("<delete commitWithin=\"" + commitWithin + "\">");
       } else {
-        writer.append( "<delete>" );
+        writer.append("<delete>");
       }
-      if( deleteI ) {
-        for( String id : deleteById ) {
-          writer.append( "<id>" );
-          XML.escapeCharData( id, writer );
-          writer.append( "</id>" );
+      if (deleteI) {
+        for (Map.Entry<String,Map<String,Object>> entry : deleteById.entrySet()) {
+          writer.append("<id");
+          Map<String,Object> map = entry.getValue();
+          if (map != null) {
+            Long version = (Long) map.get(VER);
+            if (version != null) {
+              writer.append(" version=\"" + version + "\"");
+            }
+          }
+          writer.append(">");
+          
+          XML.escapeCharData(entry.getKey(), writer);
+          writer.append("</id>");
         }
       }
-      if( deleteQ ) {
-        for( String q : deleteQuery ) {
-          writer.append( "<query>" );
-          XML.escapeCharData( q, writer );
-          writer.append( "</query>" );
+      if (deleteQ) {
+        for (String q : deleteQuery) {
+          writer.append("<query>");
+          XML.escapeCharData(q, writer);
+          writer.append("</query>");
         }
       }
-      writer.append( "</delete>" );
+      writer.append("</delete>");
     }
   }
-
-
-  //--------------------------------------------------------------------------
-  //--------------------------------------------------------------------------
-
-  //--------------------------------------------------------------------------
-  // 
-  //--------------------------------------------------------------------------
-
+  
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+  
+  // --------------------------------------------------------------------------
+  //
+  // --------------------------------------------------------------------------
+  
   public List<SolrInputDocument> getDocuments() {
+    if (documents == null) return null;
+    List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(documents.size());
+    docs.addAll(documents.keySet());
+    return docs;
+  }
+  
+  public Map<SolrInputDocument,Map<String,Object>> getDocumentsMap() {
     return documents;
   }
-
+  
   public Iterator<SolrInputDocument> getDocIterator() {
     return docIterator;
   }
-
+  
   public List<String> getDeleteById() {
+    if (deleteById == null) return null;
+    List<String> deletes = new ArrayList<String>(deleteById.keySet());
+    return deletes;
+  }
+  
+  public Map<String,Map<String,Object>> getDeleteByIdMap() {
     return deleteById;
   }
-
+  
   public List<String> getDeleteQuery() {
     return deleteQuery;
   }
-
+  
 }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/client/solrj/util/ClientUtils.java Mon Oct 21 18:58:24 2013
@@ -133,9 +133,11 @@ public class ClientUtils 
         }
       }
     }
-    
-    for (SolrInputDocument childDocument : doc.getChildDocuments()) {
-      writeXML(childDocument, writer);
+
+    if (doc.hasChildDocuments()) {
+      for (SolrInputDocument childDocument : doc.getChildDocuments()) {
+        writeXML(childDocument, writer);
+      }
     }
     
     writer.write("</doc>");

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java Mon Oct 21 18:58:24 2013
@@ -43,12 +43,10 @@ public class SolrInputDocument implement
   
   public SolrInputDocument() {
     _fields = new LinkedHashMap<String,SolrInputField>();
-    _childDocuments = new ArrayList<SolrInputDocument>();
   }
   
   public SolrInputDocument(Map<String,SolrInputField> fields) {
     _fields = fields;
-    _childDocuments = new ArrayList<SolrInputDocument>();
   }
   
   /**
@@ -60,9 +58,7 @@ public class SolrInputDocument implement
     if( _fields != null ) {
       _fields.clear();      
     }
-    if (_childDocuments != null) {
-      _childDocuments.clear();
-    }
+    _childDocuments = null;
   }
 
   ///////////////////////////////////////////////////////////////////
@@ -198,7 +194,9 @@ public class SolrInputDocument implement
   @Override
   public String toString()
   {
-    return "SolrInputDocument(fields: " + _fields.values() + ", childs: " + _childDocuments + ")";
+    return "SolrInputDocument(fields: " + _fields.values()
+        + ( _childDocuments == null ? "" : (", children: " + _childDocuments) )
+        + ")";
   }
   
   public SolrInputDocument deepCopy() {
@@ -208,11 +206,13 @@ public class SolrInputDocument implement
       clone._fields.put(fieldEntry.getKey(), fieldEntry.getValue().deepCopy());
     }
     clone._documentBoost = _documentBoost;
-    
-    clone._childDocuments = new ArrayList<SolrInputDocument>(_childDocuments.size());
-    for (SolrInputDocument child : _childDocuments) {
-      clone._childDocuments.add(child.deepCopy());  
-    }    
+
+    if (_childDocuments != null) {
+      clone._childDocuments = new ArrayList<SolrInputDocument>(_childDocuments.size());
+      for (SolrInputDocument child : _childDocuments) {
+        clone._childDocuments.add(child.deepCopy());
+      }
+    }
     
     return clone;
   }
@@ -277,6 +277,9 @@ public class SolrInputDocument implement
   }
   
   public void addChildDocument(SolrInputDocument child) {
+   if (_childDocuments == null) {
+     _childDocuments = new ArrayList<SolrInputDocument>();
+   }
     _childDocuments.add(child);
   }
   
@@ -285,7 +288,8 @@ public class SolrInputDocument implement
       addChildDocument(child);
     }
   }
-  
+
+  /** Returns the list of child documents, or null if none. */
   public List<SolrInputDocument> getChildDocuments() {
     return _childDocuments;
   }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Mon Oct 21 18:58:24 2013
@@ -260,7 +260,18 @@ public class ClusterState implements JSO
       objs.remove(DocCollection.SHARDS);
     }
 
-    DocRouter router = DocRouter.getDocRouter(props.get(DocCollection.DOC_ROUTER));
+    Object routerObj = props.get(DocCollection.DOC_ROUTER);
+    DocRouter router;
+    if (routerObj == null) {
+      router = DocRouter.DEFAULT;
+    } else if (routerObj instanceof String) {
+      // back compat with Solr4.4
+      router = DocRouter.getDocRouter((String)routerObj);
+    } else {
+      Map routerProps = (Map)routerObj;
+      router = DocRouter.getDocRouter(routerProps.get("name"));
+    }
+
     return new DocCollection(name, slices, props, router);
   }
 

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java Mon Oct 21 18:58:24 2013
@@ -17,6 +17,7 @@ package org.apache.solr.common.cloud;
  * limitations under the License.
  */
 
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.Hash;
@@ -33,10 +34,10 @@ import java.util.List;
 public class CompositeIdRouter extends HashBasedRouter {
   public static final String NAME = "compositeId";
 
-  private int separator = '!';
+  public static final int separator = '!';
 
   // separator used to optionally specify number of bits to allocate toward first part.
-  private int bitsSeparator = '/';
+  public static final int bitsSeparator = '/';
   private int bits = 16;
   private int mask1 = 0xffff0000;
   private int mask2 = 0x0000ffff;
@@ -59,17 +60,23 @@ public class CompositeIdRouter extends H
   }
 
   @Override
-  public int sliceHash(String id, SolrInputDocument doc, SolrParams params) {
+  public int sliceHash(String id, SolrInputDocument doc, SolrParams params, DocCollection collection) {
+    String shardFieldName = getRouteField(collection);
+    if (shardFieldName != null && doc != null) {
+      Object o = doc.getFieldValue(shardFieldName);
+      if (o == null)
+        throw new SolrException (SolrException.ErrorCode.BAD_REQUEST, "No value for :"+shardFieldName + ". Unable to identify shard");
+      id = o.toString();
+    }
     int idx = id.indexOf(separator);
     if (idx < 0) {
       return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
     }
-
+    String part1 = id.substring(0, idx);
+    int commaIdx = part1.indexOf(bitsSeparator);
     int m1 = mask1;
     int m2 = mask2;
 
-    String part1 = id.substring(0,idx);
-    int commaIdx = part1.indexOf(bitsSeparator);
     if (commaIdx > 0) {
       int firstBits = getBits(part1, commaIdx);
       if (firstBits >= 0) {
@@ -86,6 +93,32 @@ public class CompositeIdRouter extends H
     return (hash1 & m1) | (hash2 & m2);
   }
 
+  public Range keyHashRange(String routeKey) {
+    int idx = routeKey.indexOf(separator);
+    if (idx < 0) {
+      int hash = sliceHash(routeKey, null, null, null);
+      return new Range(hash, hash);
+    }
+    String part1 = routeKey.substring(0, idx);
+    int commaIdx = part1.indexOf(bitsSeparator);
+    int m1 = mask1;
+    int m2 = mask2;
+
+    if (commaIdx > 0) {
+      int firstBits = getBits(part1, commaIdx);
+      if (firstBits >= 0) {
+        m1 = firstBits==0 ? 0 : (-1 << (32-firstBits));
+        m2 = firstBits==32 ? 0 : (-1 >>> firstBits);
+        part1 = part1.substring(0, commaIdx);
+      }
+    }
+
+    int hash = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
+    int min = hash & m1;
+    int max = min | m2;
+    return new Range(min, max);
+  }
+
   @Override
   public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
     if (shardKey == null) {
@@ -142,6 +175,27 @@ public class CompositeIdRouter extends H
     return targetSlices;
   }
 
+  public List<Range> partitionRangeByKey(String key, Range range) {
+    List<Range> result = new ArrayList<Range>(3);
+    Range keyRange = keyHashRange(key);
+    if (!keyRange.overlaps(range)) {
+      throw new IllegalArgumentException("Key range does not overlap given range");
+    }
+    if (keyRange.equals(range)) {
+      return Collections.singletonList(keyRange);
+    } else if (keyRange.isSubsetOf(range)) {
+      result.add(new Range(range.min, keyRange.min - 1));
+      result.add(keyRange);
+      result.add((new Range(keyRange.max + 1, range.max)));
+    } else if (range.includes(keyRange.max))  {
+      result.add(new Range(range.min, keyRange.max));
+      result.add(new Range(keyRange.max + 1, range.max));
+    } else  {
+      result.add(new Range(range.min, keyRange.min - 1));
+      result.add(new Range(keyRange.min, range.max));
+    }
+    return result;
+  }
 
   @Override
   public List<Range> partitionRange(int partitions, Range range) {

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Mon Oct 21 18:58:24 2013
@@ -37,16 +37,14 @@ class ConnectionManager implements Watch
   private boolean connected;
 
   private final ZkClientConnectionStrategy connectionStrategy;
-  
-  private Object connectionUpdateLock = new Object();
 
-  private String zkServerAddress;
+  private final String zkServerAddress;
 
-  private int zkClientTimeout;
+  private final int zkClientTimeout;
 
-  private SolrZkClient client;
+  private final SolrZkClient client;
 
-  private OnReconnect onReconnect;
+  private final OnReconnect onReconnect;
 
   private volatile boolean isClosed = false;
 
@@ -92,37 +90,35 @@ class ConnectionManager implements Watch
             new ZkClientConnectionStrategy.ZkUpdate() {
               @Override
               public void update(SolrZooKeeper keeper) {
-                // if keeper does not replace oldKeeper we must be sure to close it
-                synchronized (connectionUpdateLock) {
-                  try {
-                    waitForConnected(Long.MAX_VALUE);
-                  } catch (Exception e1) {
-                    closeKeeper(keeper);
-                    throw new RuntimeException(e1);
-                  }
-                  log.info("Connection with ZooKeeper reestablished.");
-                  try {
-                    client.updateKeeper(keeper);
-                  } catch (InterruptedException e) {
-                    closeKeeper(keeper);
-                    Thread.currentThread().interrupt();
-                    // we must have been asked to stop
-                    throw new RuntimeException(e);
-                  } catch(Throwable t) {
-                    closeKeeper(keeper);
-                    throw new RuntimeException(t);
-                  }
-      
-                  if (onReconnect != null) {
-                    onReconnect.command();
-                  }
-                  synchronized (ConnectionManager.this) {
-                    ConnectionManager.this.connected = true;
-                  }
+                try {
+                  waitForConnected(Long.MAX_VALUE);
+                } catch (Exception e1) {
+                  closeKeeper(keeper);
+                  throw new RuntimeException(e1);
+                }
+                
+                log.info("Connection with ZooKeeper reestablished.");
+                try {
+                  client.updateKeeper(keeper);
+                } catch (InterruptedException e) {
+                  closeKeeper(keeper);
+                  Thread.currentThread().interrupt();
+                  // we must have been asked to stop
+                  throw new RuntimeException(e);
+                } catch (Throwable t) {
+                  closeKeeper(keeper);
+                  throw new RuntimeException(t);
+                }
+                
+                if (onReconnect != null) {
+                  onReconnect.command();
+                }
+                
+                synchronized (ConnectionManager.this) {
+                  ConnectionManager.this.connected = true;
                 }
                 
               }
-
             });
       } catch (Exception e) {
         SolrException.log(log, "", e);

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Mon Oct 21 18:58:24 2013
@@ -34,21 +34,40 @@ public class DefaultConnectionStrategy e
   
   @Override
   public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
-    updater.update(new SolrZooKeeper(serverAddress, timeout, watcher));
+    SolrZooKeeper zk = new SolrZooKeeper(serverAddress, timeout, watcher);
+    boolean success = false;
+    try {
+      updater.update(zk);
+      success = true;
+    } finally {
+      if (!success) {
+        zk.close();
+      }
+    }
   }
 
   @Override
   public void reconnect(final String serverAddress, final int zkClientTimeout,
       final Watcher watcher, final ZkUpdate updater) throws IOException {
     log.info("Connection expired - starting a new one...");
-    
+    SolrZooKeeper zk = new SolrZooKeeper(serverAddress, zkClientTimeout, watcher);
+    boolean success = false;
     try {
       updater
-          .update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
+          .update(zk);
+      success = true;
       log.info("Reconnected to ZooKeeper");
     } catch (Exception e) {
       SolrException.log(log, "Reconnect to ZooKeeper failed", e);
       log.info("Reconnect to ZooKeeper failed");
+    } finally {
+      if (!success) {
+        try {
+          zk.close();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
     }
     
   }

Modified: lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1534320&r1=1534319&r2=1534320&view=diff
==============================================================================
--- lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/lucene4956/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Mon Oct 21 18:58:24 2013
@@ -45,7 +45,7 @@ public class DocCollection extends ZkNod
    * @param props  The properties of the slice.  This is used directly and a copy is not made.
    */
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    super( props==null ? Collections.<String,Object>emptyMap() : props);
+    super( props==null ? props = new HashMap<String,Object>() : props);
     this.name = name;
 
     this.slices = slices;