You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/10/10 01:46:00 UTC

svn commit: r1180745 [1/2] - in /lucene/dev/branches/solrcloud: solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/ solr/core/src/java/org/apache/solr/client/solrj/embedded/ solr/core/src/java/org/apache/solr/cloud/ solr/core/sr...

Author: markrmiller
Date: Sun Oct  9 23:45:59 2011
New Revision: 1180745

URL: http://svn.apache.org/viewvc?rev=1180745&view=rev
Log:
SOLR-2358: Distributing Indexing - early infrastructure and tests

Added:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java   (with props)
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java   (with props)
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java   (with props)
    lucene/dev/branches/solrcloud/src/
Modified:
    lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
    lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java
    lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
    lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java
    lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/BaseDistributedSearchTestCase.java
    lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrJettyTestBase.java

Modified: lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java (original)
+++ lucene/dev/branches/solrcloud/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestContentStreamDataSource.java Sun Oct  9 23:45:59 2011
@@ -145,9 +145,8 @@ public class TestContentStreamDataSource
   }
 
   private JettySolrRunner createJetty(SolrInstance instance) throws Exception {
-    System.setProperty("solr.solr.home", instance.getHomeDir());
     System.setProperty("solr.data.dir", instance.getDataDir());
-    JettySolrRunner jetty = new JettySolrRunner("/solr", 0);
+    JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
     jetty.start();
     return jetty;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java Sun Oct  9 23:45:59 2011
@@ -51,19 +51,20 @@ public class JettySolrRunner {
 
   private boolean waitOnSolr = false;
 
-  public JettySolrRunner(String context, int port) {
-    this.init(context, port);
+  public JettySolrRunner(String solrHome, String context, int port) {
+    this.init(solrHome, context, port);
   }
 
-  public JettySolrRunner(String context, int port, String solrConfigFilename) {
-    this.init(context, port);
+  public JettySolrRunner(String solrHome, String context, int port, String solrConfigFilename) {
+    this.init(solrHome, context, port);
     this.solrConfigFilename = solrConfigFilename;
   }
 
-  private void init(String context, int port) {
+  private void init(String solrHome, String context, int port) {
     this.context = context;
     server = new Server(port);
     server.setStopAtShutdown(true);
+    System.setProperty("solr.solr.home", solrHome);
     if (System.getProperty("jetty.testMode") != null) {
       // SelectChannelConnector connector = new SelectChannelConnector();
       // Normal SocketConnector is what solr's example server uses by default
@@ -99,6 +100,8 @@ public class JettySolrRunner {
             Handler.REQUEST);
         if (solrConfigFilename != null)
           System.clearProperty("solrconfig");
+
+        System.clearProperty("solr.solr.home");
       }
 
       public void lifeCycleFailure(LifeCycle arg0, Throwable arg1) {
@@ -172,7 +175,7 @@ public class JettySolrRunner {
    */
   public static void main(String[] args) {
     try {
-      JettySolrRunner jetty = new JettySolrRunner("/solr", 8983);
+      JettySolrRunner jetty = new JettySolrRunner(".", "/solr", 8983);
       jetty.start();
     } catch (Exception ex) {
       ex.printStackTrace();

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/SliceLeaderElector.java Sun Oct  9 23:45:59 2011
@@ -17,6 +17,7 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,6 +28,7 @@ import java.util.regex.Pattern;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.zookeeper.CreateMode;
@@ -77,13 +79,15 @@ public class SliceLeaderElector {
    * @param collection
    * @param seq
    * @param leaderId
+   * @param props 
    * @throws KeeperException
    * @throws InterruptedException
+   * @throws IOException 
    * @throws UnsupportedEncodingException
    */
   private void checkIfIamLeader(final String shardId, final String collection,
-      final int seq, final String leaderId) throws KeeperException,
-      InterruptedException {
+      final int seq, final String leaderId, final ZkNodeProps props) throws KeeperException,
+      InterruptedException, IOException {
     // get all other numbers...
     String holdElectionPath = getElectionPath(shardId, collection)
         + ELECTION_NODE;
@@ -91,7 +95,7 @@ public class SliceLeaderElector {
     sortSeqs(seqs);
     List<Integer> intSeqs = getSeqs(seqs);
     if (seq <= intSeqs.get(0)) {
-      runIamLeaderProcess(shardId, collection, leaderId);
+      runIamLeaderProcess(shardId, collection, leaderId, props);
     } else {
       // I am not the leader - watch the node below me
       int i = 1;
@@ -111,7 +115,7 @@ public class SliceLeaderElector {
               public void process(WatchedEvent event) {
                 // am I the next leader?
                 try {
-                  checkIfIamLeader(shardId, collection, seq, leaderId);
+                  checkIfIamLeader(shardId, collection, seq, leaderId, props);
                 } catch (KeeperException e) {
                   log.warn("", e);
                   
@@ -119,6 +123,8 @@ public class SliceLeaderElector {
                   // Restore the interrupted status
                   Thread.currentThread().interrupt();
                   log.warn("", e);
+                } catch (IOException e) {
+                  log.warn("", e);
                 }
               }
               
@@ -126,17 +132,18 @@ public class SliceLeaderElector {
       } catch (KeeperException e) {
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
-        checkIfIamLeader(shardId, collection, seq, leaderId);
+        checkIfIamLeader(shardId, collection, seq, leaderId, props);
       }
     }
   }
 
   private void runIamLeaderProcess(final String shardId,
-      final String collection, final String leaderId) throws KeeperException,
-      InterruptedException {
+      final String collection, final String leaderId, ZkNodeProps props) throws KeeperException,
+      InterruptedException, IOException {
     String currentLeaderZkPath = getElectionPath(shardId, collection)
         + LEADER_NODE;
-    zkClient.makePath(currentLeaderZkPath + "/" + leaderId,  CreateMode.EPHEMERAL);
+    // TODO: leader election tests do not currently set the props
+    zkClient.makePath(currentLeaderZkPath + "/" + leaderId, props == null ? null : props.store(), CreateMode.EPHEMERAL);
   }
   
   /**
@@ -193,14 +200,15 @@ public class SliceLeaderElector {
    * @param shardId
    * @param collection
    * @param shardZkNodeName
+   * @param props 
    * @return sequential node number
    * @throws KeeperException
    * @throws InterruptedException
+   * @throws IOException 
    * @throws UnsupportedEncodingException
    */
   public int joinElection(String shardId, String collection,
-      String shardZkNodeName) throws KeeperException, InterruptedException,
-      UnsupportedEncodingException {
+      String shardZkNodeName, ZkNodeProps props) throws KeeperException, InterruptedException, IOException {
     final String shardsElectZkPath = getElectionPath(shardId, collection)
         + SliceLeaderElector.ELECTION_NODE;
     
@@ -209,6 +217,7 @@ public class SliceLeaderElector {
     int tries = 0;
     while (cont) {
       try {
+        
         leaderSeqPath = zkClient.create(shardsElectZkPath + "/n_", null,
             CreateMode.EPHEMERAL_SEQUENTIAL);
         cont = false;
@@ -224,7 +233,7 @@ public class SliceLeaderElector {
       }
     }
     int seq = getSeq(leaderSeqPath);
-    checkIfIamLeader(shardId, collection, seq, shardZkNodeName);
+    checkIfIamLeader(shardId, collection, seq, shardZkNodeName, props);
     
     return seq;
   }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Oct  9 23:45:59 2011
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.util.Iterator;
 import java.util.List;
@@ -500,6 +499,7 @@ public final class ZkController {
     if (shardId == null) {
       shardId = assignShard.assignShard(collection, 3); // nocommit: hard coded
                                                         // number of slices
+      cloudDesc.setShardId(shardId);
     }
     String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
 
@@ -510,10 +510,7 @@ public final class ZkController {
           + shardUrl);
     }
 
-    ZkNodeProps props = new ZkNodeProps();
-    props.put(ZkStateReader.URL_PROP, shardUrl);
-    
-    props.put(ZkStateReader.NODE_NAME, getNodeName());
+    ZkNodeProps props = getShardZkProps(shardUrl);
 
     byte[] bytes = props.store();
     
@@ -544,15 +541,24 @@ public final class ZkController {
     }
     
     // leader election
-    doLeaderElectionProcess(shardId, collection, shardZkNodeName);
+    doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
     return shardId;
   }
 
+
+  private ZkNodeProps getShardZkProps(String shardUrl) {
+    ZkNodeProps props = new ZkNodeProps();
+    props.put(ZkStateReader.URL_PROP, shardUrl);
+    
+    props.put(ZkStateReader.NODE_NAME, getNodeName());
+    return props;
+  }
+
   private void doLeaderElectionProcess(String shardId,
-      final String collection, String shardZkNodeName) throws KeeperException,
-      InterruptedException, UnsupportedEncodingException {
+      final String collection, String shardZkNodeName, ZkNodeProps props) throws KeeperException,
+      InterruptedException, IOException {
    
-    leaderElector.joinElection(shardId, collection, shardZkNodeName);
+    leaderElector.joinElection(shardId, collection, shardZkNodeName, props);
   }
 
   /**

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Sun Oct  9 23:45:59 2011
@@ -47,7 +47,6 @@ public class CoreDescriptor {
       this.cloudDesc = new CloudDescriptor();
       // cloud collection defaults to core name
       cloudDesc.setCollectionName(name.isEmpty() ? coreContainer.getDefaultCoreName() : name);
-      this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
     }
     
     if (name == null) {

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Oct  9 23:45:59 2011
@@ -0,0 +1,439 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+
+// NOT mt-safe... create a new processor for each add thread
+public class DistributedUpdateProcessor extends UpdateRequestProcessor {
+  // TODO: shut this thing down
+  static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+      5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+  
+  static HttpClient client;
+  
+  static {
+    MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+    mgr.getParams().setDefaultMaxConnectionsPerHost(8);
+    mgr.getParams().setMaxTotalConnections(200);
+    client = new HttpClient(mgr);
+  }
+  
+  CompletionService<Request> completionService;
+  Set<Future<Request>> pending;
+  
+  private final SolrQueryRequest req;
+  private final SolrQueryResponse rsp;
+  private final UpdateRequestProcessor next;;
+  private final SchemaField idField;
+  
+  private List<String> shards;
+  private final List<AddUpdateCommand>[] adds;
+  private final List<DeleteUpdateCommand>[] deletes;
+  
+  String selfStr;
+  int self;
+  int maxBufferedAddsPerServer = 10;
+  int maxBufferedDeletesPerServer = 100;
+  
+  private DistributedUpdateProcessorFactory factory;
+  
+  public DistributedUpdateProcessor(String shardStr, SolrQueryRequest req,
+      SolrQueryResponse rsp, DistributedUpdateProcessorFactory factory,
+      UpdateRequestProcessor next) {
+    super(next);
+    this.factory = factory;
+    this.req = req;
+    this.rsp = rsp;
+    this.next = next;
+    this.idField = req.getSchema().getUniqueKeyField();
+    
+    shards = factory.shards;
+    
+    String selfStr = req.getParams().get("self", factory.selfStr);
+
+    if (shardStr != null) {
+      shards = StrUtils.splitSmart(shardStr, ",", true);
+    }
+    
+    self = -1;
+    if (shards != null) {
+      for (int i = 0; i < shards.size(); i++) {
+        if (shards.get(i).equals(selfStr)) {
+          self = i;
+          break;
+        }
+      }
+    }
+    
+    if (shards == null) {
+      shards = new ArrayList<String>(1);
+      shards.add("self");
+      self = 0;
+    }
+    
+    adds = new List[shards.size()];
+    deletes = new List[shards.size()];
+  }
+  
+  private int getSlot(String id) {
+    return (id.hashCode() >>> 1) % shards.size();
+  }
+  
+  @Override
+  public void processAdd(AddUpdateCommand cmd) throws IOException {
+    checkResponses(false);
+    
+    SolrInputDocument doc = cmd.getSolrInputDocument();
+    SolrInputField field = doc.getField(idField.getName());
+    if (field == null) {
+      if (next != null) next.processAdd(cmd);
+      return;
+    }
+    String id = field.getFirstValue().toString();
+    int slot = getSlot(id);
+    if (slot == self) {
+      if (next != null) next.processAdd(cmd);
+      return;
+    }
+    
+    // make sure any pending deletes are flushed
+    flushDeletes(slot, 1, null);
+    
+    // TODO: this is brittle
+    // need to make a clone since these commands may be reused
+    AddUpdateCommand clone = new AddUpdateCommand(req);
+    
+    clone.solrDoc = cmd.solrDoc;
+    clone.commitWithin = cmd.commitWithin;
+    clone.overwrite = cmd.overwrite;
+    
+    // nocommit: review as far as SOLR-2685
+    // clone.indexedId = cmd.indexedId;
+    // clone.doc = cmd.doc;
+    
+    List<AddUpdateCommand> alist = adds[slot];
+    if (alist == null) {
+      alist = new ArrayList<AddUpdateCommand>(2);
+      adds[slot] = alist;
+    }
+    alist.add(clone);
+    
+    flushAdds(slot, maxBufferedAddsPerServer, null);
+  }
+  
+  // TODO: this is brittle
+  private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
+    DeleteUpdateCommand c = new DeleteUpdateCommand(req);
+    c.id = cmd.id;
+    c.query = cmd.query;
+    return c;
+  }
+  
+  private void doDelete(int slot, DeleteUpdateCommand cmd) throws IOException {
+    if (slot == self) {
+      if (self >= 0) next.processDelete(cmd);
+      return;
+    }
+    
+    flushAdds(slot, 1, null);
+    
+    List<DeleteUpdateCommand> dlist = deletes[slot];
+    if (dlist == null) {
+      dlist = new ArrayList<DeleteUpdateCommand>(2);
+      deletes[slot] = dlist;
+    }
+    dlist.add(clone(cmd));
+    
+    flushDeletes(slot, maxBufferedDeletesPerServer, null);
+  }
+  
+  @Override
+  public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    checkResponses(false);
+    
+    if (cmd.id != null) {
+      doDelete(getSlot(cmd.id), cmd);
+    } else if (cmd.query != null) {
+      // query must be broadcast to all
+      for (int slot = 0; slot < deletes.length; slot++) {
+        if (slot == self) continue;
+        doDelete(slot, cmd);
+      }
+      doDelete(self, cmd);
+    }
+  }
+  
+  @Override
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    // Wait for all outstanding repsonses to make sure that a commit
+    // can't sneak in ahead of adds or deletes we already sent.
+    // We could do this on a per-server basis, but it's more complex
+    // and this solution will lead to commits happening closer together.
+    checkResponses(true);
+    
+    for (int slot = 0; slot < shards.size(); slot++) {
+      if (slot == self) continue;
+      // piggyback on any outstanding adds or deletes if possible.
+      if (flushAdds(slot, 1, cmd)) continue;
+      if (flushDeletes(slot, 1, cmd)) continue;
+      
+      UpdateRequestExt ureq = new UpdateRequestExt();
+      // pass on version
+      if (ureq.getParams() == null) {
+        ureq.setParams(new ModifiableSolrParams());
+      }
+      if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+        ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+            req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+      }
+      ureq.getParams().add("update.chain", "distrib-update-chain");
+      addCommit(ureq, cmd);
+      submit(slot, ureq);
+    }
+    if (next != null && self >= 0) next.processCommit(cmd);
+    
+    // if the command wanted to block until everything was committed,
+    // then do that here.
+    // nocommit
+    if (/* cmd.waitFlush || */cmd.waitSearcher) {
+      checkResponses(true);
+    }
+  }
+  
+  @Override
+  public void finish() throws IOException {
+    for (int slot = 0; slot < shards.size(); slot++) {
+      if (slot == self) continue;
+      // piggyback on any outstanding adds or deletes if possible.
+      flushAdds(slot, 1, null);
+      flushDeletes(slot, 1, null);
+    }
+    checkResponses(true);
+    if (next != null && self >= 0) next.finish();
+  }
+  
+  void checkResponses(boolean block) {
+    while (pending != null && pending.size() > 0) {
+      try {
+        Future<Request> future = block ? completionService.take()
+            : completionService.poll();
+        if (future == null) return;
+        pending.remove(future);
+        
+        try {
+          Request sreq = future.get();
+          if (sreq.rspCode != 0) {
+            // error during request
+            
+            // use the first exception encountered
+            if (rsp.getException() == null) {
+              Exception e = sreq.exception;
+              String newMsg = "shard update error (" + sreq.shard + "):"
+                  + e.getMessage();
+              if (e instanceof SolrException) {
+                SolrException se = (SolrException) e;
+                e = new SolrException(ErrorCode.getErrorCode(se.code()),
+                    newMsg, se.getCause());
+              } else {
+                e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                    "newMsg", e);
+              }
+              rsp.setException(e);
+            }
+            
+            SolrException.logOnce(SolrCore.log, "shard update error ("
+                + sreq.shard + ")", sreq.exception);
+          }
+          
+        } catch (ExecutionException e) {
+          // shouldn't happen since we catch exceptions ourselves
+          SolrException.log(SolrCore.log,
+              "error sending update request to shard", e);
+        }
+        
+      } catch (InterruptedException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+            "interrupted waiting for shard update response", e);
+      }
+    }
+  }
+  
+  void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+    if (cmd == null) return;
+    // nocommit
+    ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
+        : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
+  }
+  
+  boolean flushAdds(int slot, int limit, CommitUpdateCommand ccmd) {
+    // check for pending deletes
+    List<AddUpdateCommand> alist = adds[slot];
+    if (alist == null || alist.size() < limit) return false;
+    
+    UpdateRequestExt ureq = new UpdateRequestExt();
+    // pass on version
+    if (ureq.getParams() == null) {
+      ureq.setParams(new ModifiableSolrParams());
+    }
+    if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+      ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+          req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+    }
+    ureq.getParams().add("update.chain", "distrib-update-chain");
+    addCommit(ureq, ccmd);
+    
+    for (AddUpdateCommand cmd : alist) {
+      ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+    }
+    
+    adds[slot] = null;
+    submit(slot, ureq);
+    return true;
+  }
+  
+  boolean flushDeletes(int slot, int limit, CommitUpdateCommand ccmd) {
+    // check for pending deletes
+    List<DeleteUpdateCommand> dlist = deletes[slot];
+    if (dlist == null || dlist.size() < limit) return false;
+    
+    UpdateRequestExt ureq = new UpdateRequestExt();
+    // pass on version
+    if (ureq.getParams() == null) {
+      ureq.setParams(new ModifiableSolrParams());
+    }
+    if (req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION) != null) {
+      ureq.getParams().add(DistributedUpdateProcessorFactory.DOCVERSION,
+          req.getParams().get(DistributedUpdateProcessorFactory.DOCVERSION));
+    }
+    ureq.getParams().add("update.chain", "distrib-update-chain");
+    addCommit(ureq, ccmd);
+    for (DeleteUpdateCommand cmd : dlist) {
+      if (cmd.id != null) {
+        ureq.deleteById(cmd.id);
+      }
+      if (cmd.query != null) {
+        ureq.deleteByQuery(cmd.query);
+      }
+    }
+    
+    deletes[slot] = null;
+    submit(slot, ureq);
+    return true;
+  }
+  
+  static class Request {
+    String shard;
+    UpdateRequestExt ureq;
+    NamedList<Object> ursp;
+    int rspCode;
+    Exception exception;
+  }
+  
+  void submit(int slot, UpdateRequestExt ureq) {
+    Request sreq = new Request();
+    sreq.shard = shards.get(slot);
+    sreq.ureq = ureq;
+    submit(sreq);
+  }
+  
+  void submit(final Request sreq) {
+    if (completionService == null) {
+      completionService = new ExecutorCompletionService<Request>(commExecutor);
+      pending = new HashSet<Future<Request>>();
+    }
+    String[] shards;
+    // look to see if we should send to multiple servers
+    if (sreq.shard.contains("|")) {
+      shards = sreq.shard.split("\\|");
+    } else {
+      shards = new String[1];
+      shards[0] = sreq.shard;
+    }
+    for (final String shard : shards) {
+      // TODO: when we break up shards, we might forward
+      // to self again - makes things simple here, but we could
+      // also have realized this before, done the req locally, and
+      // removed self from this list.
+      
+      Callable<Request> task = new Callable<Request>() {
+        @Override
+        public Request call() throws Exception {
+          
+          try {
+            String url;
+            if (!shard.startsWith("http://")) {
+              url = "http://" + sreq.shard;
+            } else {
+              url = shard;
+            }
+            
+            // TODO: allow shard syntax to use : to specify replicas
+            SolrServer server = new CommonsHttpSolrServer(url, client);
+            sreq.ursp = server.request(sreq.ureq);
+          
+            // currently no way to get the request body.
+          } catch (Exception e) {
+            sreq.exception = e;
+            if (e instanceof SolrException) {
+              sreq.rspCode = ((SolrException) e).code();
+            } else {
+              sreq.rspCode = -1;
+            }
+          }
+          return sreq;
+        }
+      };
+      
+      pending.add(completionService.submit(task));
+    }
+  }
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java Sun Oct  9 23:45:59 2011
@@ -0,0 +1,165 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
+
+public class DistributedUpdateProcessorFactory extends
+    UpdateRequestProcessorFactory {
+  public static final String DOCVERSION = "docversion";
+  NamedList args;
+  List<String> shards;
+  String selfStr;
+  String shardsString;
+  
+  @Override
+  public void init(NamedList args) {
+    selfStr = (String) args.get("self");
+    Object o = args.get("shards");
+    if (o != null && o instanceof List) {
+      shards = (List<String>) o;
+      shardsString = StrUtils.join((List<String>) o, ',');
+    } else if (o != null && o instanceof String) {
+      shards = StrUtils.splitSmart((String) o, ",", true);
+      shardsString = (String) o;
+    }
+  }
+  
+  /** return the list of shards, or null if not configured */
+  public List<String> getShards() {
+    return shards;
+  }
+  
+  public String getShardsString() {
+    return shardsString;
+  }
+  
+  /** return "self", or null if not configured */
+  public String getSelf() {
+    return selfStr;
+  }
+  
+  @Override
+  public DistributedUpdateProcessor getInstance(SolrQueryRequest req,
+      SolrQueryResponse rsp, UpdateRequestProcessor next) {
+    CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+    
+    // TODO: could do this here, or in a previous update processor.
+    // if we are in zk mode...
+    if (coreDesc.getCoreContainer().getZkController() != null) {
+      // the leader is...
+      // TODO: if there is no leader, wait and look again
+      // TODO: we are reading the leader from zk every time - we should cache
+      // this
+      // and watch for changes
+      List<String> leaderChildren;
+      String collection = coreDesc.getCloudDescriptor().getCollectionName();
+      String shardId = coreDesc.getCloudDescriptor().getShardId();
+      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      String leaderNode = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
+          + ZkStateReader.LEADER_ELECT_ZKNODE + "/" + shardId + "/leader";
+      SolrZkClient zkClient = coreDesc.getCoreContainer().getZkController()
+          .getZkClient();
+      try {
+        leaderChildren = zkClient.getChildren(leaderNode, null);
+        if (leaderChildren.size() > 0) {
+          String leader = leaderChildren.get(0);
+          ZkNodeProps zkNodeProps = new ZkNodeProps();
+          byte[] bytes = zkClient
+              .getData(leaderNode + "/" + leader, null, null);
+          zkNodeProps.load(bytes);
+          String leaderUrl = zkNodeProps.get("url");
+          
+          String nodeName = req.getCore().getCoreDescriptor()
+              .getCoreContainer().getZkController().getNodeName();
+          String shardZkNodeName = nodeName + "_" + req.getCore().getName();
+
+          if (params.get(DOCVERSION) != null
+              && params.get(DOCVERSION).equals("yes")) {
+            // we got a version, just go local
+          } else if (shardZkNodeName.equals(leader)) {
+            // that means I want to forward onto my replicas...
+            
+            // so get the replicas...
+            CloudState cloudState = req.getCore().getCoreDescriptor()
+                .getCoreContainer().getZkController().getCloudState();
+            Slice replicas = cloudState.getSlices(collection).get(shardId);
+            Map<String,ZkNodeProps> shardMap = replicas.getShards();
+            String self = null;
+            StringBuilder replicasUrl = new StringBuilder();
+            for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
+              if (replicasUrl.length() > 0) {
+                replicasUrl.append("|");
+              }
+              String replicaUrl = entry.getValue().get("url");
+              if (shardZkNodeName.equals(entry.getKey())) {
+                self = replicaUrl;
+              }
+              replicasUrl.append(replicaUrl);
+            }
+            versionDoc(params);
+            params.add("self", self);
+            params.add("shards", replicasUrl.toString());
+          } else {
+            // I need to forward onto the leader...
+            // TODO: don't use leader - we need to get the real URL from the zk
+            // node
+            params.add("shards", leaderUrl);
+          }
+          req.setParams(params);
+        }
+      } catch (KeeperException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      } catch (IOException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+    }
+    
+    String shardStr = req.getParams().get("shards");
+    if (shards == null && shardStr == null) return null;
+    return new DistributedUpdateProcessor(shardStr, req, rsp, this, next);
+  }
+  
+  private void versionDoc(ModifiableSolrParams params) {
+    params.set(DOCVERSION, "yes");
+  }
+}

Added: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-distrib-update.xml Sun Oct  9 23:45:59 2011
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<!-- $Id: solrconfig-nocache.xml 1144761 2011-07-09 23:01:53Z sarowe $ $Source$ 
+	$Name$ -->
+
+<config>
+    <luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
+    <dataDir>${solr.data.dir:}</dataDir>
+	<!-- The DirectoryFactory to use for indexes. solr.StandardDirectoryFactory, 
+		the default, is filesystem based. solr.RAMDirectoryFactory is memory based 
+		and not persistent. -->
+	<directoryFactory name="DirectoryFactory"
+		class="${solr.directoryFactory:solr.RAMDirectoryFactory}" />
+
+	<updateHandler class="solr.DirectUpdateHandler2">
+
+	</updateHandler>
+	<requestHandler name="standard" class="solr.StandardRequestHandler" />
+
+	<requestDispatcher handleSelect="true">
+		<requestParsers enableRemoteStreaming="true"
+			multipartUploadLimitInKB="2048" />
+		<httpCaching never304="true" />
+	</requestDispatcher>
+
+    <requestHandler name="/update"     class="solr.XmlUpdateRequestHandler"          />
+
+	<updateRequestProcessorChain name="distrib-update-chain">
+		<processor class="solr.DistributedUpdateProcessorFactory">
+			<!-- example configuration... "shards should be in the *same* order for 
+				every server in a cluster. Only "self" should change to represent what server 
+				*this* is. <str name="self">localhost:8983/solr</str> <arr name="shards"> 
+				<str>localhost:8983/solr</str> <str>localhost:7574/solr</str> </arr> -->
+		</processor>
+		<processor class="solr.LogUpdateProcessorFactory">
+			<int name="maxNumToLog">10</int>
+		</processor>
+		<processor class="solr.RunUpdateProcessorFactory" />
+	</updateRequestProcessorChain>
+
+</config>

Modified: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/solrconfig-nocache.xml Sun Oct  9 23:45:59 2011
@@ -30,8 +30,11 @@
         solr.RAMDirectoryFactory is memory based and not persistent. -->
   <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
 
+  <dataDir>${solr.data.dir:}</dataDir>
+
   <updateHandler class="solr.DirectUpdateHandler2">
   </updateHandler>
+  
   <requestHandler name="standard" class="solr.StandardRequestHandler"/>
 
   <requestDispatcher handleSelect="true" >

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/TestSolrCoreProperties.java Sun Oct  9 23:45:59 2011
@@ -45,10 +45,9 @@ public class TestSolrCoreProperties exte
   public void setUp() throws Exception {
     super.setUp();
     setUpMe();
-    System.setProperty("solr.solr.home", getHomeDir());
     System.setProperty("solr.data.dir", getDataDir());
     
-    solrJetty = new JettySolrRunner("/solr", 0);
+    solrJetty = new JettySolrRunner(getHomeDir(), "/solr", 0);
 
     solrJetty.start();
     String url = "http://localhost:" + solrJetty.getLocalPort() + "/solr";

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Sun Oct  9 23:45:59 2011
@@ -84,6 +84,7 @@ public abstract class AbstractZkTestCase
 
     putConfig(zkClient, config);
     putConfig(zkClient, schema);
+    putConfig(zkClient, "solrconfig-distrib-update.xml");
     putConfig(zkClient, "stopwords.txt");
     putConfig(zkClient, "protwords.txt");
     putConfig(zkClient, "mapping-ISOLatin1Accent.txt");

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Sun Oct  9 23:45:59 2011
@@ -19,13 +19,11 @@ package org.apache.solr.cloud;
 
 import java.net.MalformedURLException;
 
-import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrServer;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.junit.BeforeClass;
 
 /**
  *
@@ -56,12 +54,6 @@ public class BasicDistributedZkTest exte
     
     System.setProperty("CLOUD_UPDATE_DELAY", "0");
   }
-
-  
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    System.setProperty("solr.solr.home", SolrTestCaseJ4.TEST_HOME());
-  }
   
   @Override
   protected void setDistributedParams(ModifiableSolrParams params) {

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Sun Oct  9 23:45:59 2011
@@ -243,18 +243,18 @@ public class CloudStateUpdateTest extend
       cloudState2 = zkController2.getCloudState();
       slices = cloudState2.getSlices("testcore");
       
-      if (slices != null && slices.containsKey(host + ":1661_solr_testcore")
-          && slices.get(host + ":1661_solr_testcore").getShards().size() > 0) {
+      if (slices != null && slices.containsKey("shard1")
+          && slices.get("shard1").getShards().size() > 0) {
         break;
       }
       Thread.sleep(500);
     }
 
     assertNotNull(slices);
-    assertTrue(slices.containsKey(host + ":1661_solr_testcore"));
+    assertTrue(slices.containsKey("shard1"));
 
-    Slice slice = slices.get(host + ":1661_solr_testcore");
-    assertEquals(host + ":1661_solr_testcore", slice.getName());
+    Slice slice = slices.get("shard1");
+    assertEquals("shard1", slice.getName());
 
     Map<String,ZkNodeProps> shards = slice.getShards();
 

Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sun Oct  9 23:45:59 2011
@@ -0,0 +1,333 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+/**
+ *
+ */
+public class FullDistributedZkTest extends AbstractDistributedZkTestCase {
+  
+  private static final String DEFAULT_COLLECTION = "collection1";
+  private static final boolean DEBUG = false;
+  String t1="a_t";
+  String i1="a_si";
+  String nint = "n_i";
+  String tint = "n_ti";
+  String nfloat = "n_f";
+  String tfloat = "n_tf";
+  String ndouble = "n_d";
+  String tdouble = "n_td";
+  String nlong = "n_l";
+  String tlong = "other_tl1";
+  String ndate = "n_dt";
+  String tdate = "n_tdt";
+  
+  String oddField="oddField_s";
+  String missingField="ignore_exception__missing_but_valid_field_t";
+  String invalidField="ignore_exception__invalid_field_not_in_schema";
+  
+  public FullDistributedZkTest() {
+    fixShardCount = true;
+    shardCount = 6;
+    System.setProperty("CLOUD_UPDATE_DELAY", "0");
+  }
+  
+  @Override
+  protected void createServers(int numShards) throws Exception {
+    System.setProperty("collection", "control_collection");
+    controlJetty = createJetty(testDir, testDir + "/control/data", "control_shard");
+    System.clearProperty("collection");
+    controlClient = createNewSolrServer(controlJetty.getLocalPort());
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 1; i <= numShards; i++) {
+      if (sb.length() > 0) sb.append(',');
+      JettySolrRunner j = createJetty(testDir, testDir + "/jetty" + i, null, "solrconfig-distrib-update.xml");
+      jettys.add(j);
+      clients.add(createNewSolrServer(j.getLocalPort()));
+
+    }
+    
+    // build the shard string
+    for (int i = 1; i <= numShards/2; i++) {
+      JettySolrRunner j = jettys.get(i);
+      JettySolrRunner j2 = jettys.get(i + (numShards/2 - 1));
+      if (sb.length() > 0) sb.append(',');
+      sb.append("localhost:").append(j.getLocalPort()).append(context);
+      sb.append("|localhost:").append(j2.getLocalPort()).append(context);
+    }
+    shards = sb.toString();
+  }
+  
+  @Override
+  protected void setDistributedParams(ModifiableSolrParams params) {
+
+    if (r.nextBoolean()) {
+      // don't set shards, let that be figured out from the cloud state
+      params.set("distrib", "true");
+    } else {
+      // use shard ids rather than physical locations
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < shardCount / 2; i++) {
+        if (i > 0)
+          sb.append(',');
+        sb.append("shard" + (i+1));
+      }
+      params.set("shards", sb.toString());
+      params.set("distrib", "true");
+    }
+  }
+  
+  @Override
+  protected void indexDoc(SolrInputDocument doc) throws IOException, SolrServerException {
+    controlClient.add(doc);
+ 
+    boolean pick = random.nextBoolean();
+    
+    int mod = (clients.size() / 2);
+    
+    int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % mod;
+    
+    if (pick) {
+      which = which + mod;
+    }
+    
+    CommonsHttpSolrServer client = (CommonsHttpSolrServer) clients.get(which);
+
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(doc);
+    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.process(client);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.solr.BaseDistributedSearchTestCase#doTest()
+   * 
+   * Create 3 shards, each with one replica
+   */
+  @Override
+  public void doTest() throws Exception {
+    printLayout();
+    // make sure 'shard1' was auto-assigned
+    SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
+    assertTrue("shard1 was not found in zk layout", zkClient.exists("/solr/collections/collection1/shards/shard1"));
+    zkClient.close();
+    
+    del("*:*");
+    indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
+            ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
+    indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country."
+    );
+    indexr(id,3, i1, 2, tlong, 2,t1,"how now brown cow"
+    );
+    indexr(id,4, i1, -100 ,tlong, 101,t1,"the quick fox jumped over the lazy dog"
+    );
+    indexr(id,5, i1, 500, tlong, 500 ,t1,"the quick fox jumped way over the lazy dog"
+    );
+    indexr(id,6, i1, -600, tlong, 600 ,t1,"humpty dumpy sat on a wall");
+    indexr(id,7, i1, 123, tlong, 123 ,t1,"humpty dumpy had a great fall");
+    indexr(id,8, i1, 876, tlong, 876,t1,"all the kings horses and all the kings men");
+    indexr(id,9, i1, 7, tlong, 7,t1,"couldn't put humpty together again");
+    indexr(id,10, i1, 4321, tlong, 4321,t1,"this too shall pass");
+    indexr(id,11, i1, -987, tlong, 987,t1,"An eye for eye only ends up making the whole world blind.");
+    indexr(id,12, i1, 379, tlong, 379,t1,"Great works are performed, not by strength, but by perseverance.");
+    indexr(id,13, i1, 232, tlong, 232,t1,"no eggs on wall, lesson learned", oddField, "odd man out");
+
+    indexr(id, 14, "SubjectTerms_mfacet", new String[]  {"mathematical models", "mathematical analysis"});
+    indexr(id, 15, "SubjectTerms_mfacet", new String[]  {"test 1", "test 2", "test3"});
+    indexr(id, 16, "SubjectTerms_mfacet", new String[]  {"test 1", "test 2", "test3"});
+    String[] vals = new String[100];
+    for (int i=0; i<100; i++) {
+      vals[i] = "test " + i;
+    }
+    indexr(id, 17, "SubjectTerms_mfacet", vals);
+
+    for (int i=100; i<150; i++) {
+      indexr(id, i);      
+    }
+
+    commit();
+
+    handle.clear();
+    handle.put("QTime", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    // random value sort
+    for (String f : fieldNames) {
+      query("q","*:*", "sort",f+" desc");
+      query("q","*:*", "sort",f+" asc");
+    }
+
+    // these queries should be exactly ordered and scores should exactly match
+    query("q","*:*", "sort",i1+" desc");
+    query("q","*:*", "sort",i1+" asc");
+    query("q","*:*", "sort",i1+" desc", "fl","*,score");
+    query("q","*:*", "sort","n_tl1 asc", "fl","score");  // test legacy behavior - "score"=="*,score"
+    query("q","*:*", "sort","n_tl1 desc");
+    handle.put("maxScore", SKIPVAL);
+    query("q","{!func}"+i1);// does not expect maxScore. So if it comes ,ignore it. JavaBinCodec.writeSolrDocumentList()
+    //is agnostic of request params.
+    handle.remove("maxScore");
+    query("q","{!func}"+i1, "fl","*,score");  // even scores should match exactly here
+
+    handle.put("highlighting", UNORDERED);
+    handle.put("response", UNORDERED);
+
+    handle.put("maxScore", SKIPVAL);
+    query("q","quick");
+    query("q","all","fl","id","start","0");
+    query("q","all","fl","foofoofoo","start","0");  // no fields in returned docs
+    query("q","all","fl","id","start","100");
+
+    handle.put("score", SKIPVAL);
+    query("q","quick","fl","*,score");
+    query("q","all","fl","*,score","start","1");
+    query("q","all","fl","*,score","start","100");
+
+    query("q","now their fox sat had put","fl","*,score",
+            "hl","true","hl.fl",t1);
+
+    query("q","now their fox sat had put","fl","foofoofoo",
+            "hl","true","hl.fl",t1);
+
+    query("q","matchesnothing","fl","*,score");  
+
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1);
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count");
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","count", "facet.mincount",2);
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index");
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.limit",-1, "facet.sort","index", "facet.mincount",2);
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1,"facet.limit",1);
+    query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*");
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.offset",1);
+    query("q","*:*", "rows",100, "facet","true", "facet.field",t1, "facet.mincount",2);
+
+    // test faceting multiple things at once
+    query("q","*:*", "rows",100, "facet","true", "facet.query","quick", "facet.query","all", "facet.query","*:*"
+    ,"facet.field",t1);
+
+    // test filter tagging, facet exclusion, and naming (multi-select facet support)
+    query("q","*:*", "rows",100, "facet","true", "facet.query","{!key=myquick}quick", "facet.query","{!key=myall ex=a}all", "facet.query","*:*"
+    ,"facet.field","{!key=mykey ex=a}"+t1
+    ,"facet.field","{!key=other ex=b}"+t1
+    ,"facet.field","{!key=again ex=a,b}"+t1
+    ,"facet.field",t1
+    ,"fq","{!tag=a}id:[1 TO 7]", "fq","{!tag=b}id:[3 TO 9]"
+    );
+    query("q", "*:*", "facet", "true", "facet.field", "{!ex=t1}SubjectTerms_mfacet", "fq", "{!tag=t1}SubjectTerms_mfacet:(test 1)", "facet.limit", "10", "facet.mincount", "1");
+
+    // test field that is valid in schema but missing in all shards
+    query("q","*:*", "rows",100, "facet","true", "facet.field",missingField, "facet.mincount",2);
+    // test field that is valid in schema and missing in some shards
+    query("q","*:*", "rows",100, "facet","true", "facet.field",oddField, "facet.mincount",2);
+
+    query("q","*:*", "sort",i1+" desc", "stats", "true", "stats.field", i1);
+
+
+    // Try to get better coverage for refinement queries by turning off over requesting.
+    // This makes it much more likely that we may not get the top facet values and hence
+    // we turn of that checking.
+    handle.put("facet_fields", SKIPVAL);    
+    query("q","*:*", "rows",0, "facet","true", "facet.field",t1,"facet.limit",5, "facet.shard.limit",5);
+    // check a complex key name
+    query("q","*:*", "rows",0, "facet","true", "facet.field","{!key='a b/c \\' \\} foo'}"+t1,"facet.limit",5, "facet.shard.limit",5);
+    handle.remove("facet_fields");
+
+
+    // index the same document to two servers and make sure things
+    // don't blow up.
+    if (clients.size()>=2) {
+      index(id,100, i1, 107 ,t1,"oh no, a duplicate!");
+      for (int i=0; i<clients.size(); i++) {
+        index_specific(i, id,100, i1, 107 ,t1,"oh no, a duplicate!");
+      }
+      commit();
+      query("q","duplicate", "hl","true", "hl.fl", t1);
+      query("q","fox duplicate horses", "hl","true", "hl.fl", t1);
+      query("q","*:*", "rows",100);
+    }
+
+    // test debugging
+    handle.put("explain", UNORDERED);
+    handle.put("debug", UNORDERED);
+    handle.put("time", SKIPVAL);
+    query("q","now their fox sat had put","fl","*,score",CommonParams.DEBUG_QUERY, "true");
+    query("q", "id:[1 TO 5]", CommonParams.DEBUG_QUERY, "true");
+    query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.TIMING);
+    query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.RESULTS);
+    query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
+
+    // TODO: This test currently fails because debug info is obtained only
+    // on shards with matches.
+    // query("q","matchesnothing","fl","*,score", "debugQuery", "true");
+
+    // Thread.sleep(10000000000L);
+    if (DEBUG) {
+      super.printLayout();
+    }
+  }
+
+  volatile CloudSolrServer solrj;
+
+  @Override
+  protected QueryResponse queryServer(ModifiableSolrParams params) throws SolrServerException {
+
+    if (r.nextBoolean())
+      return super.queryServer(params);
+
+    // use the distributed solrj client
+    if (solrj == null) {
+      synchronized(this) {
+        try {
+          CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
+          server.setDefaultCollection(DEFAULT_COLLECTION);
+          solrj = server;
+        } catch (MalformedURLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    if (r.nextBoolean())
+      params.set("collection",DEFAULT_COLLECTION);
+
+    QueryResponse rsp = solrj.query(params);
+    return rsp;
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    System.clearProperty("CLOUD_UPDATE_DELAY");
+    System.clearProperty("zkHost");
+  }
+}

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Sun Oct  9 23:45:59 2011
@@ -79,7 +79,7 @@ public class LeaderElectionTest extends 
         
         elector.setupForSlice("shard1", "collection1");
         seq = elector.joinElection("shard1", "collection1",
-            Integer.toString(nodeNumber));
+            Integer.toString(nodeNumber), null);
         seqToThread.put(seq, this);
         // run forever - we will be explicitly killed
         Thread.sleep(Integer.MAX_VALUE);
@@ -103,14 +103,14 @@ public class LeaderElectionTest extends 
     SliceLeaderElector elector = new SliceLeaderElector(zkClient1);
     
     elector.setupForSlice("shard2", "collection1");
-    elector.joinElection("shard2", "collection1", "dummynode1");
+    elector.joinElection("shard2", "collection1", "dummynode1", null);
     
     SolrZkClient zkClient2 = new SolrZkClient(server.getZkAddress(), TIMEOUT);
     
     SliceLeaderElector elector2 = new SliceLeaderElector(zkClient2);
     
     elector2.setupForSlice("shard2", "collection1");
-    elector2.joinElection("shard2", "collection1", "dummynode2");
+    elector2.joinElection("shard2", "collection1", "dummynode2", null);
     
     List<ClientThread> threads = new ArrayList<ClientThread>();
     

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Oct  9 23:45:59 2011
@@ -260,11 +260,17 @@ public class ZkControllerTest extends So
       zkController.createCollectionZkNode(cloudDesc);
      
       String shard1 = zkController.register("core1", cloudDesc);
+      cloudDesc.setShardId(null);
       String shard2 = zkController.register("core2", cloudDesc);
+      cloudDesc.setShardId(null);
       String shard3 = zkController.register("core3", cloudDesc);
+      cloudDesc.setShardId(null);
       String shard4 = zkController.register("core4", cloudDesc);
+      cloudDesc.setShardId(null);
       String shard5 = zkController.register("core5", cloudDesc);
+      cloudDesc.setShardId(null);
       String shard6 = zkController.register("core6", cloudDesc);
+      cloudDesc.setShardId(null);
 
       assertEquals("shard1", shard1);
       assertEquals("shard2", shard2);

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java Sun Oct  9 23:45:59 2011
@@ -105,10 +105,9 @@ public class TestReplicationHandler exte
   }
 
   private static JettySolrRunner createJetty(SolrInstance instance) throws Exception {
-    System.setProperty("solr.solr.home", instance.getHomeDir());
     System.setProperty("solr.data.dir", instance.getDataDir());
 
-    JettySolrRunner jetty = new JettySolrRunner("/solr", 0);
+    JettySolrRunner jetty = new JettySolrRunner(instance.getHomeDir(), "/solr", 0);
 
     jetty.start();
     return jetty;

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/schema/TestBinaryField.java Sun Oct  9 23:45:59 2011
@@ -67,12 +67,11 @@ public class TestBinaryField extends Luc
     out = new FileOutputStream(f);
     IOUtils.copy(loader.openResource(fname), out);
     out.close();
-    System.setProperty("solr.solr.home", homeDir.getAbsolutePath());
     System.setProperty("solr.data.dir", dataDir.getAbsolutePath());
     System.setProperty("solr.test.sys.prop1", "propone");
     System.setProperty("solr.test.sys.prop2", "proptwo");
 
-    jetty = new JettySolrRunner(context, 0);
+    jetty = new JettySolrRunner(homeDir.getAbsolutePath(), context, 0);
     jetty.start();
     port = jetty.getLocalPort();
 

Added: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/processor/TestDistributedUpdate.java Sun Oct  9 23:45:59 2011
@@ -0,0 +1,369 @@
+package org.apache.solr.update.processor;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+
+public class TestDistributedUpdate extends SolrTestCaseJ4 {
+  
+  private static final int NUM_JETTIES = 2;
+  
+  File testDir;
+  
+  List<SolrServer> clients = new ArrayList<SolrServer>();
+  List<JettySolrRunner> jettys = new ArrayList<JettySolrRunner>();
+  String context = "/solr/collection1";
+  String shardStr;
+  String[] shards;
+  boolean updateSelf;
+  
+  String id = "id";
+  String t1 = "a_t";
+  String i1 = "a_i";
+  String oddField = "oddField_s";
+  String missingField = "missing_but_valid_field_t";
+  String invalidField = "invalid_field_not_in_schema";
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+    testDir = new File(TEMP_DIR, "distrib_update_test");
+    testDir.mkdirs();
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    destroyServers();
+    super.tearDown();
+  }
+  
+  private void createServers() throws Exception {
+    StringBuilder sb = new StringBuilder();
+    shards = new String[NUM_JETTIES];
+    for (int i = 0; i < NUM_JETTIES; i++) {
+      
+      if (sb.length() > 0) sb.append(',');
+      JettySolrRunner jetty = createJetty(testDir, testDir + "/shard" + i
+          + "/data", "solrconfig-distrib-update.xml");
+      jettys.add(jetty);
+      int port = jetty.getLocalPort();
+      clients.add(createNewSolrServer(port));
+      shards[i] = "localhost:" + port + context;
+      sb.append(shards[i]);
+    }
+    
+    shardStr = sb.toString();
+    
+    // Assure that Solr starts with no documents
+    send(commit(u().deleteByQuery("*:*")));
+  }
+  
+  private void destroyServers() throws Exception {
+    for (JettySolrRunner jetty : jettys)
+      jetty.stop();
+    clients.clear();
+    jettys.clear();
+  }
+  
+  public JettySolrRunner createJetty(File baseDir, String dataDir)
+      throws Exception {
+    return createJetty(baseDir, dataDir, null, null);
+  }
+  
+  public JettySolrRunner createJetty(File baseDir, String dataDir,
+      String shardId) throws Exception {
+    return createJetty(baseDir, dataDir, shardId,
+        "solrconfig-distrib-update.xml");
+  }
+  
+  public JettySolrRunner createJetty(File baseDir, String dataDir,
+      String shardList, String solrConfigOverride) throws Exception {
+    System.setProperty("solr.data.dir", dataDir);
+    
+    JettySolrRunner jetty = new JettySolrRunner(TEST_HOME(), "/solr", 0, solrConfigOverride);
+    if (shardList != null) {
+      System.setProperty("shard", shardList);
+    }
+    jetty.start();
+    System.clearProperty("shard");
+    return jetty;
+  }
+  
+  protected SolrServer createNewSolrServer(int port) {
+    try {
+      // setup the server...
+      String url = "http://localhost:" + port + context;
+      CommonsHttpSolrServer s = new CommonsHttpSolrServer(url);
+      s.setConnectionTimeout(1000); // 1 sec
+      s.setDefaultMaxConnectionsPerHost(100);
+      s.setMaxTotalConnections(100);
+      return s;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  QueryResponse query(Object... q) throws Exception {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    
+    for (int i = 0; i < q.length; i += 2) {
+      params.add(q[i].toString(), q[i + 1].toString());
+    }
+    
+    params.set("shards", shardStr);
+    
+    // query a random server
+    int which = random.nextInt(clients.size());
+    SolrServer client = clients.get(which);
+    QueryResponse rsp = client.query(params);
+    return rsp;
+  }
+  
+  void send(int which, AbstractUpdateRequest ureq) throws Exception {
+    ureq.setParam("update.chain", "distrib-update-chain");
+    ureq.setParam("shards", shardStr);
+    ureq.setParam("self", updateSelf ? shards[which] : "foo");
+    
+    SolrServer client = clients.get(which);
+    client.request(ureq);
+  }
+  
+  SolrInputDocument doc(Object... fields) {
+    SolrInputDocument doc = new SolrInputDocument();
+    for (int i = 0; i < fields.length; i += 2) {
+      doc.addField((String) (fields[i]), fields[i + 1]);
+    }
+    return doc;
+  }
+  
+  // send request to a random server
+  void send(AbstractUpdateRequest ureq) throws Exception {
+    send((random.nextInt() >>> 1) % shards.length, ureq);
+  }
+  
+  UpdateRequest u() {
+    return new UpdateRequest();
+  }
+  
+  AbstractUpdateRequest commit(AbstractUpdateRequest ureq) {
+    ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
+    return ureq;
+  }
+  
+  UpdateRequest optimize(UpdateRequest ureq) {
+    ureq.setAction(UpdateRequest.ACTION.OPTIMIZE, true, true);
+    return ureq;
+  }
+  
+  UpdateRequest add(UpdateRequest ureq, Object... ids) {
+    for (Object id : ids) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", id.toString());
+      ureq.add(doc);
+    }
+    return ureq;
+  }
+  
+  void verifyCount(String q, int count) throws Exception {
+    verifyCount(q, count, 0);
+  }
+  
+  void verifyCount(String q, int count, int retries) throws Exception {
+    long found = query("q", q).getResults().getNumFound();
+    for (int i = 0; i < retries; i++) {
+      if (found == count) {
+        break;
+      }
+      Thread.sleep(500);
+      found = query("q", q).getResults().getNumFound();
+    }
+    
+    assertEquals(count, found);
+    // use a facet to get the "real" count since distributed search
+    // can do some dedup for us.
+    assertEquals(count, query("q", "*:*", "facet", "true", "facet.query", q)
+        .getFacetQuery().get(q).longValue());
+  }
+  
+  public void testStress() throws Exception {
+    int iter = 10; // crank this number up for a long term test
+    
+    createServers();
+    updateSelf = true;
+    
+    List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(1000);
+    for (int i = 0; i < 1000; i++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "" + i);
+      docs.add(doc);
+    }
+    
+    List<String> ids = new ArrayList<String>(1000);
+    for (int i = 0; i < 1000; i++)
+      ids.add("" + i);
+    
+    boolean haveAddedDocs = false;
+    for (int i = 0; i < iter; i++) {
+      // System.out.println("ITERATION"+i);
+      if ((random.nextInt() & 0x01) == 0) {
+        // System.out.println("ITERATION"+i+" - 0");
+        haveAddedDocs = true;
+        UpdateRequest addReq = new UpdateRequest();
+        addReq.add(docs);
+        send(commit(addReq));
+        verifyCount("id:[* TO *]", 1000);
+      } else {
+        // System.out.println("ITERATION"+i+" - 1");
+        UpdateRequest delReq = new UpdateRequest();
+        for (int j = 0; j < 1000; j += 2) {
+          delReq.deleteById(ids.get(j));
+        }
+        send(commit(delReq));
+        verifyCount("id:[* TO *]", (haveAddedDocs ? 500 : 0));
+      }
+      
+      // optimize to keep the index size under control.
+      if (i % 25 == 0) {
+        send(optimize(u()));
+      }
+      
+    }
+    
+    destroyServers();
+  }
+  
+  public void testDistribUpdate() throws Exception {
+    for (int nServers = 2; nServers < 4; nServers++) {
+      
+      createServers();
+      
+      // node doesn't know who it is... sends to itself over HTTP
+      updateSelf = false;
+      doTest();
+      
+      // node does know who it is... updates index directly for itself
+      updateSelf = true;
+      doTest();
+      
+      destroyServers();
+    }
+  }
+  
+  public void doTest() throws Exception {
+    send(0, commit(u().deleteByQuery("*:*")));
+    verifyCount("id:1", 0);
+    
+    send(0, add(u(), 1));
+    send(1, add(u(), 1));
+    verifyCount("id:1", 0); // no commit yet
+    send(commit(u()));
+    verifyCount("id:1", 1); // doc should only have been sent to single server
+    
+    send(u().deleteById("1"));
+    verifyCount("id:1", 1); // no commit yet
+    send(commit(u()));
+    verifyCount("id:1", 0);
+    
+    // test adding a commit onto an add
+    send(commit(add(u(), 1)));
+    verifyCount("id:1", 1);
+    
+    // test adding a commmit onto a delete
+    send(commit(u().deleteById("1")));
+    verifyCount("id:1", 0);
+    
+    // test that batching adds doesn't mess anything up
+    send(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9));
+    send(commit(u()));
+    // Thread.sleep(1000000000);
+    verifyCount("id:[1 TO 9]", 9);
+    
+    // test delete by query
+    send(commit(u().deleteByQuery("id:[2 TO 8]")));
+    verifyCount("id:[1 TO 9]", 2);
+    
+    send(commit(add(u(), 1, 2, 3, 4, 5, 6, 7, 8, 9)));
+    verifyCount("id:[1 TO 9]", 9);
+    
+    send(commit(u().deleteByQuery("*:*")));
+    verifyCount("id:[1 TO 9]", 0);
+    
+    // this test can cause failures if a commit can sneak in ahead of
+    // add requests that are still pending.
+    Object[] docs = new Object[1000];
+    for (int i = 0; i < 1000; i++)
+      docs[i] = i;
+    send(commit(add(u(), docs)));
+    verifyCount("id:[* TO *]", 1000);
+    
+    // test delete batching
+    UpdateRequest ureq = u();
+    for (int i = 0; i < 1000; i += 2) {
+      ureq.deleteById("" + i);
+    }
+    send(commit(ureq));
+    verifyCount("id:[* TO *]", 500);
+    
+    // test commit within
+    ureq = u();
+    ureq.setCommitWithin(1);
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", "999999");
+    ureq.add(doc);
+    send(ureq);
+    
+
+    verifyCount("id:[* TO *]", 501, 300);
+    
+    send(commit(ureq));
+    
+    // test overwrite
+    UpdateRequestExt lweureq = new UpdateRequestExt();
+    doc = new SolrInputDocument();
+    doc.addField("id", "999999");
+    lweureq.add(doc, 3, false);
+    send(commit(lweureq));
+    
+    verifyCount("id:[* TO *]", 502);
+    
+    // test overwrite with no commitWithin
+    lweureq = new UpdateRequestExt();
+    doc = new SolrInputDocument();
+    doc.addField("id", "999999");
+    lweureq.add(doc, -1, false);
+    send(commit(lweureq));
+    
+    verifyCount("id:[* TO *]", 503);
+  }
+  
+}

Added: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java?rev=1180745&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java (added)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequestExt.java Sun Oct  9 23:45:59 2011
@@ -0,0 +1,234 @@
+package org.apache.solr.client.solrj.request;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.XML;
+
+// TODO: bake this into UpdateRequest
+public class UpdateRequestExt extends AbstractUpdateRequest {
+  
+  private List<SolrDoc> documents = null;
+  private List<String> deleteById = null;
+  private List<String> deleteQuery = null;
+  
+  private class SolrDoc {
+    @Override
+    public String toString() {
+      return "SolrDoc [document=" + document + ", commitWithin=" + commitWithin
+          + ", overwrite=" + overwrite + "]";
+    }
+    SolrInputDocument document;
+    int commitWithin;
+    boolean overwrite;
+  }
+  
+  public UpdateRequestExt() {
+    super(METHOD.POST, "/update");
+  }
+
+  public UpdateRequestExt(String url) {
+    super(METHOD.POST, url);
+  }
+  
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+  
+  /**
+   * clear the pending documents and delete commands
+   */
+  public void clear() {
+    if (documents != null) {
+      documents.clear();
+    }
+    if (deleteById != null) {
+      deleteById.clear();
+    }
+    if (deleteQuery != null) {
+      deleteQuery.clear();
+    }
+  }
+  
+  // ---------------------------------------------------------------------------
+  // ---------------------------------------------------------------------------
+
+  public UpdateRequestExt add(final SolrInputDocument doc) {
+    if (documents == null) {
+      documents = new ArrayList<SolrDoc>(2);
+    }
+    SolrDoc solrDoc = new SolrDoc();
+    solrDoc.document = doc;
+    solrDoc.commitWithin = -1;
+    solrDoc.overwrite = true;
+    documents.add(solrDoc);
+    
+    return this;
+  }
+  
+  public UpdateRequestExt add(final SolrInputDocument doc, int commitWithin,
+      boolean overwrite) {
+    if (documents == null) {
+      documents = new ArrayList<SolrDoc>(2);
+    }
+    SolrDoc solrDoc = new SolrDoc();
+    solrDoc.document = doc;
+    solrDoc.commitWithin = commitWithin;
+    solrDoc.overwrite = overwrite;
+    documents.add(solrDoc);
+    
+    return this;
+  }
+  
+  public UpdateRequestExt deleteById(String id) {
+    if (deleteById == null) {
+      deleteById = new ArrayList<String>();
+    }
+    deleteById.add(id);
+    return this;
+  }
+  
+  public UpdateRequestExt deleteById(List<String> ids) {
+    if (deleteById == null) {
+      deleteById = new ArrayList<String>(ids);
+    } else {
+      deleteById.addAll(ids);
+    }
+    return this;
+  }
+  
+  public UpdateRequestExt deleteByQuery(String q) {
+    if (deleteQuery == null) {
+      deleteQuery = new ArrayList<String>();
+    }
+    deleteQuery.add(q);
+    return this;
+  }
+  
+  // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
+  
+  @Override
+  public Collection<ContentStream> getContentStreams() throws IOException {
+    return ClientUtils.toContentStreams(getXML(), ClientUtils.TEXT_XML);
+  }
+  
+  public String getXML() throws IOException {
+    StringWriter writer = new StringWriter();
+    writeXML(writer);
+    writer.flush();
+
+    String xml = writer.toString();
+
+    return (xml.length() > 0) ? xml : null;
+  }
+  
+  public void writeXML(Writer writer) throws IOException {
+    List<List<SolrDoc>> getDocLists = getDocLists(documents);
+    
+    for (List<SolrDoc> docs : getDocLists) {
+      
+      if ((docs != null && docs.size() > 0)) {
+        SolrDoc firstDoc = docs.get(0);
+        int commitWithin = firstDoc.commitWithin != -1 ? firstDoc.commitWithin : this.commitWithin;
+        boolean overwrite = firstDoc.overwrite;
+        if (commitWithin > -1 || overwrite != true) {
+          writer.write("<add commitWithin=\"" + commitWithin + "\" " + "overwrite=\"" + overwrite + "\">");
+        } else {
+          writer.write("<add>");
+        }
+        if (documents != null) {
+          for (SolrDoc doc : documents) {
+            if (doc != null) {
+              ClientUtils.writeXML(doc.document, writer);
+            }
+          }
+        }
+        
+        writer.write("</add>");
+      }
+    }
+    
+    // Add the delete commands
+    boolean deleteI = deleteById != null && deleteById.size() > 0;
+    boolean deleteQ = deleteQuery != null && deleteQuery.size() > 0;
+    if (deleteI || deleteQ) {
+      writer.append("<delete>");
+      if (deleteI) {
+        for (String id : deleteById) {
+          writer.append("<id>");
+          XML.escapeCharData(id, writer);
+          writer.append("</id>");
+        }
+      }
+      if (deleteQ) {
+        for (String q : deleteQuery) {
+          writer.append("<query>");
+          XML.escapeCharData(q, writer);
+          writer.append("</query>");
+        }
+      }
+      writer.append("</delete>");
+    }
+  }
+  
+  private List<List<SolrDoc>> getDocLists(List<SolrDoc> documents) {
+    List<List<SolrDoc>> docLists = new ArrayList<List<SolrDoc>>();
+    if (this.documents == null) {
+      return docLists;
+    }
+    boolean lastOverwrite = true;
+    int lastCommitWithin = -1;
+    List<SolrDoc> docList = null;
+    for (SolrDoc doc : this.documents) {
+      if (doc.overwrite != lastOverwrite
+          || doc.commitWithin != lastCommitWithin || docLists.size() == 0) {
+        docList = new ArrayList<SolrDoc>();
+        docLists.add(docList);
+      }
+      docList.add(doc);
+      lastCommitWithin = doc.commitWithin;
+      lastOverwrite = doc.overwrite;
+    }
+
+    return docLists;
+  }
+  
+  public List<String> getDeleteById() {
+    return deleteById;
+  }
+  
+  public List<String> getDeleteQuery() {
+    return deleteQuery;
+  }
+  
+  @Override
+  public String toString() {
+    return "UpdateRequestExt [documents=" + documents + ", deleteById="
+        + deleteById + ", deleteQuery=" + deleteQuery + "]";
+  }
+  
+}

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrServer.java Sun Oct  9 23:45:59 2011
@@ -258,8 +258,7 @@ public class TestLBHttpSolrServer extend
     }
 
     public void startJetty() throws Exception {
-      jetty = new JettySolrRunner("/solr", port);
-      System.setProperty("solr.solr.home", getHomeDir());
+      jetty = new JettySolrRunner(getHomeDir(), "/solr", port);
       System.setProperty("solr.data.dir", getDataDir());
       jetty.start();
       int newPort = jetty.getLocalPort();

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java?rev=1180745&r1=1180744&r2=1180745&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/MultiCoreExampleJettyTest.java Sun Oct  9 23:45:59 2011
@@ -53,7 +53,7 @@ public class MultiCoreExampleJettyTest e
     System.clearProperty("solr.directoryFactory");
     super.setUp();
 
-    jetty = new JettySolrRunner( context, 0 );
+    jetty = new JettySolrRunner(getSolrHome(), context, 0 );
     jetty.start(false);
     port = jetty.getLocalPort();