You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2012/01/25 21:32:53 UTC

svn commit: r1235919 [10/12] - in /lucene/dev/branches/lucene3661: ./ dev-tools/eclipse/ dev-tools/idea/lucene/contrib/ dev-tools/maven/ dev-tools/maven/solr/core/ dev-tools/maven/solr/solrj/ lucene/ lucene/contrib/ lucene/contrib/sandbox/src/test/org/...

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Wed Jan 25 20:32:44 2012
@@ -17,14 +17,6 @@
 package org.apache.solr.search;
 
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
@@ -33,17 +25,32 @@ import org.apache.lucene.index.*;
 import org.apache.lucene.search.*;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util._TestUtil;
 import org.apache.noggit.ObjectBuilder;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import static  org.apache.solr.core.SolrCore.verbose;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
 
 public class TestRealTimeGet extends SolrTestCaseJ4 {
+  private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
+
 
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -59,10 +66,26 @@ public class TestRealTimeGet extends Sol
     assertJQ(req("q","id:1")
         ,"/response/numFound==0"
     );
-    assertJQ(req("qt","/get","id","1")
+    assertJQ(req("qt","/get", "id","1", "fl","id")
         ,"=={'doc':{'id':'1'}}"
     );
-    assertJQ(req("qt","/get","ids","1")
+    assertJQ(req("qt","/get","ids","1", "fl","id")
+        ,"=={" +
+        "  'response':{'numFound':1,'start':0,'docs':[" +
+        "      {" +
+        "        'id':'1'}]" +
+        "  }}}"
+    );
+
+    assertU(commit());
+
+    assertJQ(req("q","id:1")
+        ,"/response/numFound==1"
+    );
+    assertJQ(req("qt","/get","id","1", "fl","id")
+        ,"=={'doc':{'id':'1'}}"
+    );
+    assertJQ(req("qt","/get","ids","1", "fl","id")
         ,"=={" +
         "  'response':{'numFound':1,'start':0,'docs':[" +
         "      {" +
@@ -70,127 +93,939 @@ public class TestRealTimeGet extends Sol
         "  }}}"
     );
 
-    assertU(commit());
+    assertU(delI("1"));
+
+    assertJQ(req("q","id:1")
+        ,"/response/numFound==1"
+    );
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':null}"
+    );
+    assertJQ(req("qt","/get","ids","1")
+        ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
+    );
+
+
+    assertU(adoc("id","10"));
+    assertU(adoc("id","11"));
+    assertJQ(req("qt","/get","id","10", "fl","id")
+        ,"=={'doc':{'id':'10'}}"
+    );
+    assertU(delQ("id:10 abcdef"));
+    assertJQ(req("qt","/get","id","10")
+        ,"=={'doc':null}"
+    );
+    assertJQ(req("qt","/get","id","11", "fl","id")
+        ,"=={'doc':{'id':'11'}}"
+    );
+
+
+  }
+
+
+  @Test
+  public void testVersions() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    long version = addAndGetVersion(sdoc("id","1") , null);
+
+    assertJQ(req("q","id:1")
+        ,"/response/numFound==0"
+    );
+
+    // test version is there from rtg
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // test version is there from the index
+    assertU(commit());
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // simulate an update from the leader
+    version += 10;
+    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+    // test version is there from rtg
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // simulate reordering: test that a version less than that does not take affect
+    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+    // test that version hasn't changed
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // simulate reordering: test that a delete w/ version less than that does not take affect
+    // TODO: also allow passing version on delete instead of on URL?
+    updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+    // test that version hasn't changed
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // make sure reordering detection also works after a commit
+    assertU(commit());
+
+    // simulate reordering: test that a version less than that does not take affect
+    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+    // test that version hasn't changed
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // simulate reordering: test that a delete w/ version less than that does not take affect
+    updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+    // test that version hasn't changed
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+    );
+
+    // now simulate a normal delete from the leader
+    version += 5;
+    updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version)));
+
+    // make sure a reordered add doesn't take affect.
+    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+    // test that it's still deleted
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':null}"
+    );
+
+    // test that we can remember the version of a delete after a commit
+    assertU(commit());
+
+    // make sure a reordered add doesn't take affect.
+    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+    // test that it's still deleted
+    assertJQ(req("qt","/get","id","1")
+        ,"=={'doc':null}"
+    );
+
+    version = addAndGetVersion(sdoc("id","2"), null);
+    long version2 = deleteByQueryAndGetVersion("id:2", null);
+    assertTrue(Math.abs(version2) > version );
+    
+  }
+
+
+
+  /***
+  @Test
+  public void testGetRealtime() throws Exception {
+    SolrQueryRequest sr1 = req("q","foo");
+    IndexReader r1 = sr1.getCore().getRealtimeReader();
+
+    assertU(adoc("id","1"));
+
+    IndexReader r2 = sr1.getCore().getRealtimeReader();
+    assertNotSame(r1, r2);
+    int refcount = r2.getRefCount();
+
+    // make sure a new reader wasn't opened
+    IndexReader r3 = sr1.getCore().getRealtimeReader();
+    assertSame(r2, r3);
+    assertEquals(refcount+1, r3.getRefCount());
+
+    assertU(commit());
+
+    // this is not critical, but currently a commit does not refresh the reader
+    // if nothing has changed
+    IndexReader r4 = sr1.getCore().getRealtimeReader();
+    assertEquals(refcount+2, r4.getRefCount());
+
+
+    r1.decRef();
+    r2.decRef();
+    r3.decRef();
+    r4.decRef();
+    sr1.close();
+  }
+  ***/
+
+
+  final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+  Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
+  long snapshotCount;
+  long committedModelClock;
+  volatile int lastId;
+  final String field = "val_l";
+  Object[] syncArr;
+
+  private void initModel(int ndocs) {
+    snapshotCount = 0;
+    committedModelClock = 0;
+    lastId = 0;
+
+    syncArr = new Object[ndocs];
+
+    for (int i=0; i<ndocs; i++) {
+      model.put(i, new DocInfo(0, -1L));
+      syncArr[i] = new Object();
+    }
+    committedModel.putAll(model);
+  }
+
+
+  static class DocInfo {
+    long version;
+    long val;
+
+    public DocInfo(long version, long val) {
+      this.version = version;
+      this.val = val;
+    }
+
+    public String toString() {
+      return "{version="+version+",val="+val+"\"";
+    }
+  }
+
+  @Test
+  public void testStressGetRealtime() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
+
+    final int commitPercent = 5 + random.nextInt(20);
+    final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+    final int deletePercent = 4+random.nextInt(25);
+    final int deleteByQueryPercent = 1+random.nextInt(5);
+    final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+    int nWriteThreads = 5 + random.nextInt(25);
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+        // query variables
+    final int percentRealtimeQuery = 60;
+    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+    int nReadThreads = 5 + random.nextInt(25);
+
+
+    verbose("commitPercent=", commitPercent);
+    verbose("softCommitPercent=",softCommitPercent);
+    verbose("deletePercent=",deletePercent);
+    verbose("deleteByQueryPercent=", deleteByQueryPercent);
+    verbose("ndocs=", ndocs);
+    verbose("nWriteThreads=", nWriteThreads);
+    verbose("nReadThreads=", nReadThreads);
+    verbose("percentRealtimeQuery=", percentRealtimeQuery);
+    verbose("maxConcurrentCommits=", maxConcurrentCommits);
+    verbose("operations=", operations);
+
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+          while (operations.get() > 0) {
+            int oper = rand.nextInt(100);
+
+            if (oper < commitPercent) {
+              if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                Map<Integer,DocInfo> newCommittedModel;
+                long version;
+
+                synchronized(TestRealTimeGet.this) {
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
+                  version = snapshotCount++;
+                  verbose("took snapshot version=",version);
+                }
+
+                if (rand.nextInt(100) < softCommitPercent) {
+                  verbose("softCommit start");
+                  assertU(h.commit("softCommit","true"));
+                  verbose("softCommit end");
+                } else {
+                  verbose("hardCommit start");
+                  assertU(commit());
+                  verbose("hardCommit end");
+                }
+
+                synchronized(TestRealTimeGet.this) {
+                  // install this model snapshot only if it's newer than the current one
+                  if (version >= committedModelClock) {
+                    if (VERBOSE) {
+                      verbose("installing new committedModel version="+committedModelClock);
+                    }
+                    committedModel = newCommittedModel;
+                    committedModelClock = version;
+                  }
+                }
+              }
+              numCommitting.decrementAndGet();
+              continue;
+            }
+
+
+            int id = rand.nextInt(ndocs);
+            Object sync = syncArr[id];
+
+            // set the lastId before we actually change it sometimes to try and
+            // uncover more race conditions between writing and reading
+            boolean before = rand.nextBoolean();
+            if (before) {
+              lastId = id;
+            }
+
+            // We can't concurrently update the same document and retain our invariants of increasing values
+            // since we can't guarantee what order the updates will be executed.
+            // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+            synchronized (sync) {
+              DocInfo info = model.get(id);
+
+              long val = info.val;
+              long nextVal = Math.abs(val)+1;
+
+              if (oper < commitPercent + deletePercent) {
+                if (VERBOSE) {
+                  verbose("deleting id",id,"val=",nextVal);
+                }
+
+                // assertU("<delete><id>" + id + "</id></delete>");
+                Long version = deleteAndGetVersion(Integer.toString(id), null);
+
+                model.put(id, new DocInfo(version, -nextVal));
+                if (VERBOSE) {
+                  verbose("deleting id", id, "val=",nextVal,"DONE");
+                }
+              } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+                if (VERBOSE) {
+                  verbose("deleteByQuery id ",id, "val=",nextVal);
+                }
+
+                assertU("<delete><query>id:" + id + "</query></delete>");
+                model.put(id, new DocInfo(-1L, -nextVal));
+                if (VERBOSE) {
+                  verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
+                }
+              } else {
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal);
+                }
+
+                // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+                model.put(id, new DocInfo(version, nextVal));
+
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"DONE");
+                }
+
+              }
+            }   // end sync
+
+            if (!before) {
+              lastId = id;
+            }
+          }
+        } catch (Throwable e) {
+          operations.set(-1L);
+          SolrException.log(log, e);
+          fail(e.getMessage());
+        }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = model.get(id);
+              } else {
+                synchronized(TestRealTimeGet.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+              if (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVal < Math.abs(info.val)
+                    || (foundVer == info.version && foundVal != info.val) ) {    // if the version matches, the val must
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
+                }
+              }
+            }
+          }
+          catch (Throwable e) {
+            operations.set(-1L);
+            SolrException.log(log, e);
+            fail(e.getMessage());
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+  }
+
+
+  // This version doesn't synchronize on id to tell what update won, but instead uses versions
+  @Test
+  public void testStressGetRealtimeVersions() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    final int commitPercent = 5 + random.nextInt(20);
+    final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+    final int deletePercent = 4+random.nextInt(25);
+    final int deleteByQueryPercent = 1 + random.nextInt(5);
+    final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+    int nWriteThreads = 5 + random.nextInt(25);
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+        // query variables
+    final int percentRealtimeQuery = 75;
+    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+    int nReadThreads = 5 + random.nextInt(25);
+
+
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+          while (operations.get() > 0) {
+            int oper = rand.nextInt(100);
+
+            if (oper < commitPercent) {
+              if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                Map<Integer,DocInfo> newCommittedModel;
+                long version;
+
+                synchronized(TestRealTimeGet.this) {
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
+                  version = snapshotCount++;
+                }
+
+                if (rand.nextInt(100) < softCommitPercent) {
+                  verbose("softCommit start");
+                  assertU(h.commit("softCommit","true"));
+                  verbose("softCommit end");
+                } else {
+                  verbose("hardCommit start");
+                  assertU(commit());
+                  verbose("hardCommit end");
+                }
+
+                synchronized(TestRealTimeGet.this) {
+                  // install this model snapshot only if it's newer than the current one
+                  if (version >= committedModelClock) {
+                    if (VERBOSE) {
+                      verbose("installing new committedModel version="+committedModelClock);
+                    }
+                    committedModel = newCommittedModel;
+                    committedModelClock = version;
+                  }
+                }
+              }
+              numCommitting.decrementAndGet();
+              continue;
+            }
+
+
+            int id = rand.nextInt(ndocs);
+            Object sync = syncArr[id];
+
+            // set the lastId before we actually change it sometimes to try and
+            // uncover more race conditions between writing and reading
+            boolean before = rand.nextBoolean();
+            if (before) {
+              lastId = id;
+            }
+
+            // We can't concurrently update the same document and retain our invariants of increasing values
+            // since we can't guarantee what order the updates will be executed.
+            // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+            //
+            // NOTE: versioning means we can now remove the sync and tell what update "won"
+            // synchronized (sync) {
+              DocInfo info = model.get(id);
+
+              long val = info.val;
+              long nextVal = Math.abs(val)+1;
+
+              if (oper < commitPercent + deletePercent) {
+                verbose("deleting id",id,"val=",nextVal);
+
+                Long version = deleteAndGetVersion(Integer.toString(id), null);
+                assertTrue(version < 0);
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleting id", id, "val=",nextVal,"DONE");
+              } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+                verbose("deleteByQyery id",id,"val=",nextVal);
+
+                Long version = deleteByQueryAndGetVersion("id:"+Integer.toString(id), null);
+                assertTrue(version < 0);
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleteByQyery id", id, "val=",nextVal,"DONE");
+              } else {
+                verbose("adding id", id, "val=", nextVal);
+
+                // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+                assertTrue(version > 0);
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
+
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"DONE");
+                }
+
+              }
+            // }   // end sync
+
+            if (!before) {
+              lastId = id;
+            }
+          }
+        } catch (Throwable e) {
+          operations.set(-1L);
+          SolrException.log(log, e);
+          fail(e.getMessage());
+        }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = model.get(id);
+              } else {
+                synchronized(TestRealTimeGet.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+              if (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVer < Math.abs(info.version)
+                    || (foundVer == info.version && foundVal != info.val) ) {    // if the version matches, the val must
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
+                }
+              }
+            }
+          }
+          catch (Throwable e) {
+            operations.set(-1L);
+            SolrException.log(log, e);
+            fail(e.getMessage());
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+  }
+
+  // This version simulates updates coming from the leader and sometimes being reordered
+  @Test
+  public void testStressReorderVersions() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    final int commitPercent = 5 + random.nextInt(20);
+    final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+    final int deletePercent = 4+random.nextInt(25);
+    final int deleteByQueryPercent = 0;  // delete-by-query can't be reordered on replicas
+    final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+    int nWriteThreads = 5 + random.nextInt(25);
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+        // query variables
+    final int percentRealtimeQuery = 75;
+    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+    int nReadThreads = 5 + random.nextInt(25);
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+
+    final AtomicLong testVersion = new AtomicLong(0);
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+          while (operations.get() > 0) {
+            int oper = rand.nextInt(100);
+
+            if (oper < commitPercent) {
+              if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                Map<Integer,DocInfo> newCommittedModel;
+                long version;
+
+                synchronized(TestRealTimeGet.this) {
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
+                  version = snapshotCount++;
+                }
+
+                if (rand.nextInt(100) < softCommitPercent) {
+                  verbose("softCommit start");
+                  assertU(h.commit("softCommit","true"));
+                  verbose("softCommit end");
+                } else {
+                  verbose("hardCommit start");
+                  assertU(commit());
+                  verbose("hardCommit end");
+                }
+
+                synchronized(TestRealTimeGet.this) {
+                  // install this model snapshot only if it's newer than the current one
+                  if (version >= committedModelClock) {
+                    if (VERBOSE) {
+                      verbose("installing new committedModel version="+committedModelClock);
+                    }
+                    committedModel = newCommittedModel;
+                    committedModelClock = version;
+                  }
+                }
+              }
+              numCommitting.decrementAndGet();
+              continue;
+            }
+
+
+            int id;
+
+            if (rand.nextBoolean()) {
+              id = rand.nextInt(ndocs);
+            } else {
+              id = lastId;  // reuse the last ID half of the time to force more race conditions
+            }
+
+            // set the lastId before we actually change it sometimes to try and
+            // uncover more race conditions between writing and reading
+            boolean before = rand.nextBoolean();
+            if (before) {
+              lastId = id;
+            }
+
+            DocInfo info = model.get(id);
 
-    assertJQ(req("q","id:1")
-        ,"/response/numFound==1"
-    );
-    assertJQ(req("qt","/get","id","1")
-        ,"=={'doc':{'id':'1'}}"
-    );
-    assertJQ(req("qt","/get","ids","1")
-        ,"=={" +
-        "  'response':{'numFound':1,'start':0,'docs':[" +
-        "      {" +
-        "        'id':'1'}]" +
-        "  }}}"
-    );
+            long val = info.val;
+            long nextVal = Math.abs(val)+1;
 
-    assertU(delI("1"));
+            // the version we set on the update should determine who wins
+            // These versions are not derived from the actual leader update handler hand hence this
+            // test may need to change depending on how we handle version numbers.
+            long version = testVersion.incrementAndGet();
 
-    assertJQ(req("q","id:1")
-        ,"/response/numFound==1"
-    );
-    assertJQ(req("qt","/get","id","1")
-        ,"=={'doc':null}"
-    );
-    assertJQ(req("qt","/get","ids","1")
-        ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
-    );
+            // yield after getting the next version to increase the odds of updates happening out of order
+            if (rand.nextBoolean()) Thread.yield();
 
-  }
+              if (oper < commitPercent + deletePercent) {
+                verbose("deleting id",id,"val=",nextVal,"version",version);
 
+                Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
 
-  /***
-  @Test
-  public void testGetRealtime() throws Exception {
-    SolrQueryRequest sr1 = req("q","foo");
-    IndexReader r1 = sr1.getCore().getRealtimeReader();
+                // TODO: returning versions for these types of updates is redundant
+                // but if we do return, they had better be equal
+                if (returnedVersion != null) {
+                  assertEquals(-version, returnedVersion.longValue());
+                }
 
-    assertU(adoc("id","1"));
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
 
-    IndexReader r2 = sr1.getCore().getRealtimeReader();
-    assertNotSame(r1, r2);
-    int refcount = r2.getRefCount();
+                verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+              } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
 
-    // make sure a new reader wasn't opened
-    IndexReader r3 = sr1.getCore().getRealtimeReader();
-    assertSame(r2, r3);
-    assertEquals(refcount+1, r3.getRefCount());
+              } else {
+                verbose("adding id", id, "val=", nextVal,"version",version);
 
-    assertU(commit());
+                Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+                if (returnedVersion != null) {
+                  assertEquals(version, returnedVersion.longValue());
+                }
 
-    // this is not critical, but currently a commit does not refresh the reader
-    // if nothing has changed
-    IndexReader r4 = sr1.getCore().getRealtimeReader();
-    assertEquals(refcount+2, r4.getRefCount());
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
 
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+                }
 
-    r1.decRef();
-    r2.decRef();
-    r3.decRef();
-    r4.decRef();
-    sr1.close();
-  }
-  ***/
+              }
+            // }   // end sync
+
+            if (!before) {
+              lastId = id;
+            }
+          }
+        } catch (Throwable e) {
+          operations.set(-1L);
+          SolrException.log(log, e);
+          fail(e.getMessage());
+        }
+        }
+      };
 
+      threads.add(thread);
+    }
 
-  final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
-  Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
-  long snapshotCount;
-  long committedModelClock;
-  volatile int lastId;
-  final String field = "val_l";
-  Object[] syncArr;
 
-  private void initModel(int ndocs) {
-    snapshotCount = 0;
-    committedModelClock = 0;
-    lastId = 0;
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
 
-    syncArr = new Object[ndocs];
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
 
-    for (int i=0; i<ndocs; i++) {
-      model.put(i, -1L);
-      syncArr[i] = new Object();
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = model.get(id);
+              } else {
+                synchronized(TestRealTimeGet.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+              if (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVer < Math.abs(info.version)
+                    || (foundVer == info.version && foundVal != info.val) ) {    // if the version matches, the val must
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
+                }
+              }
+            }
+          }
+          catch (Throwable e) {
+            operations.set(-1L);
+            SolrException.log(log, e);
+            fail(e.getMessage());
+          }
+        }
+      };
+
+      threads.add(thread);
     }
-    committedModel.putAll(model);
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
   }
 
+  // This points to the live model when state is ACTIVE, but a snapshot of the
+  // past when recovering.
+  volatile ConcurrentHashMap<Integer,DocInfo> visibleModel;
+
+  // This version simulates updates coming from the leader and sometimes being reordered
+  // and tests the ability to buffer updates and apply them later
   @Test
-  public void testStressGetRealtime() throws Exception {
+  public void testStressRecovery() throws Exception {
     clearIndex();
     assertU(commit());
 
-    // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
-
-    final int commitPercent = 5 + random.nextInt(20);
+    final int commitPercent = 5 + random.nextInt(10);
     final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
     final int deletePercent = 4+random.nextInt(25);
     final int deleteByQueryPercent = 0;  // real-time get isn't currently supported with delete-by-query
     final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
-    int nWriteThreads = 5 + random.nextInt(25);
+    int nWriteThreads = 2 + random.nextInt(10);  // fewer write threads to give recovery thread more of a chance
 
     final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
 
         // query variables
-    final int percentRealtimeQuery = 60;
-    // final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
-    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
-    int nReadThreads = 5 + random.nextInt(25);
-
-
-    verbose("commitPercent=", commitPercent);
-    verbose("softCommitPercent=",softCommitPercent);
-    verbose("deletePercent=",deletePercent);
-    verbose("deleteByQueryPercent=", deleteByQueryPercent);
-    verbose("ndocs=", ndocs);
-    verbose("nWriteThreads=", nWriteThreads);
-    verbose("nReadThreads=", nReadThreads);
-    verbose("percentRealtimeQuery=", percentRealtimeQuery);
-    verbose("maxConcurrentCommits=", maxConcurrentCommits);
-    verbose("operations=", operations);
-
+    final int percentRealtimeQuery = 75;
+    final AtomicLong operations = new AtomicLong(atLeast(75));  // number of recovery loops to perform
+    int nReadThreads = 2 + random.nextInt(10);  // fewer read threads to give writers more of a chance
 
     initModel(ndocs);
 
@@ -198,39 +1033,63 @@ public class TestRealTimeGet extends Sol
 
     List<Thread> threads = new ArrayList<Thread>();
 
+
+    final AtomicLong testVersion = new AtomicLong(0);
+
+
+    final UpdateHandler uHandler = h.getCore().getUpdateHandler();
+    final UpdateLog uLog = uHandler.getUpdateLog();
+    final VersionInfo vInfo = uLog.getVersionInfo();
+    final Object stateChangeLock = new Object();
+    this.visibleModel = model;
+    final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
+    for (int i=0; i<nWriteThreads; i++) writePermissions[i] = new Semaphore(Integer.MAX_VALUE, false);
+
+    final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);
+
     for (int i=0; i<nWriteThreads; i++) {
+      final int threadNum = i;
+
       Thread thread = new Thread("WRITER"+i) {
         Random rand = new Random(random.nextInt());
+        Semaphore writePermission = writePermissions[threadNum];
 
         @Override
         public void run() {
           try {
           while (operations.get() > 0) {
-            int oper = rand.nextInt(100);
+            writePermission.acquire();
+
+            int oper = rand.nextInt(10);
 
             if (oper < commitPercent) {
               if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
-                Map<Integer,Long> newCommittedModel;
+                Map<Integer,DocInfo> newCommittedModel;
                 long version;
 
                 synchronized(TestRealTimeGet.this) {
-                  newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
                   version = snapshotCount++;
-                  verbose("took snapshot version=",version);
                 }
 
-                if (rand.nextInt(100) < softCommitPercent) {
-                  verbose("softCommit start");
-                  assertU(h.commit("softCommit","true"));
-                  verbose("softCommit end");
-                } else {
-                  verbose("hardCommit start");
-                  assertU(commit());
-                  verbose("hardCommit end");
+                synchronized (stateChangeLock) {
+                  // These commits won't take affect if we are in recovery mode,
+                  // so change the version to -1 so we won't update our model.
+                  if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
+                  if (rand.nextInt(100) < softCommitPercent) {
+                    verbose("softCommit start");
+                    assertU(h.commit("softCommit","true"));
+                    verbose("softCommit end");
+                  } else {
+                    verbose("hardCommit start");
+                    assertU(commit());
+                    verbose("hardCommit end");
+                  }
                 }
 
                 synchronized(TestRealTimeGet.this) {
                   // install this model snapshot only if it's newer than the current one
+                  // install this model only if we are not in recovery mode.
                   if (version >= committedModelClock) {
                     if (VERBOSE) {
                       verbose("installing new committedModel version="+committedModelClock);
@@ -245,8 +1104,13 @@ public class TestRealTimeGet extends Sol
             }
 
 
-            int id = rand.nextInt(ndocs);
-            Object sync = syncArr[id];
+            int id;
+
+            if (rand.nextBoolean()) {
+              id = rand.nextInt(ndocs);
+            } else {
+              id = lastId;  // reuse the last ID half of the time to force more race conditions
+            }
 
             // set the lastId before we actually change it sometimes to try and
             // uncover more race conditions between writing and reading
@@ -255,46 +1119,63 @@ public class TestRealTimeGet extends Sol
               lastId = id;
             }
 
-            // We can't concurrently update the same document and retain our invariants of increasing values
-            // since we can't guarantee what order the updates will be executed.
-            synchronized (sync) {
-              Long val = model.get(id);
-              long nextVal = Math.abs(val)+1;
+            DocInfo info = model.get(id);
+
+            long val = info.val;
+            long nextVal = Math.abs(val)+1;
+
+            // the version we set on the update should determine who wins
+            // These versions are not derived from the actual leader update handler hand hence this
+            // test may need to change depending on how we handle version numbers.
+            long version = testVersion.incrementAndGet();
+
+            // yield after getting the next version to increase the odds of updates happening out of order
+            if (rand.nextBoolean()) Thread.yield();
 
               if (oper < commitPercent + deletePercent) {
-                if (VERBOSE) {
-                  verbose("deleting id",id,"val=",nextVal);
+                verbose("deleting id",id,"val=",nextVal,"version",version);
+
+                Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
+
+                // TODO: returning versions for these types of updates is redundant
+                // but if we do return, they had better be equal
+                if (returnedVersion != null) {
+                  assertEquals(-version, returnedVersion.longValue());
                 }
 
-                assertU("<delete><id>" + id + "</id></delete>");
-                model.put(id, -nextVal);
-                if (VERBOSE) {
-                  verbose("deleting id", id, "val=",nextVal,"DONE");
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
                 }
+
+                verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
               } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
-                if (VERBOSE) {
-                  verbose("deleteByQuery id ",id, "val=",nextVal);
-                }
 
-                assertU("<delete><query>id:" + id + "</query></delete>");
-                model.put(id, -nextVal);
-                if (VERBOSE) {
-                  verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
-                }
               } else {
-                if (VERBOSE) {
-                  verbose("adding id", id, "val=", nextVal);
+                verbose("adding id", id, "val=", nextVal,"version",version);
+
+                Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+                if (returnedVersion != null) {
+                  assertEquals(version, returnedVersion.longValue());
                 }
 
-                assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
-                model.put(id, nextVal);
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
 
                 if (VERBOSE) {
-                  verbose("adding id", id, "val=", nextVal,"DONE");
+                  verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
                 }
 
               }
-            }
+            // }   // end sync
 
             if (!before) {
               lastId = id;
@@ -319,7 +1200,11 @@ public class TestRealTimeGet extends Sol
         @Override
         public void run() {
           try {
-            while (operations.decrementAndGet() >= 0) {
+            while (operations.get() > 0) {
+              // throttle reads (don't completely stop)
+              readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);
+
+
               // bias toward a recently changed doc
               int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
 
@@ -327,17 +1212,18 @@ public class TestRealTimeGet extends Sol
               // so when querying, we should first check the model, and then the index
 
               boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
-              long val;
+              DocInfo info;
 
               if (realTime) {
-                val = model.get(id);
+                info = visibleModel.get(id);
               } else {
                 synchronized(TestRealTimeGet.this) {
-                  val = committedModel.get(id);
+                  info = committedModel.get(id);
                 }
               }
 
-              if (VERBOSE) {
+
+              if  (VERBOSE) {
                 verbose("querying id", id);
               }
               SolrQueryRequest sreq;
@@ -355,9 +1241,11 @@ public class TestRealTimeGet extends Sol
               } else {
                 assertEquals(1, doclist.size());
                 long foundVal = (Long)(((Map)doclist.get(0)).get(field));
-                if (foundVal < Math.abs(val)) {
-                  verbose("ERROR, id", id, "foundVal=",foundVal,"model val=",val,"realTime=",realTime);
-                  assertTrue(foundVal >= Math.abs(val));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVer < Math.abs(info.version)
+                    || (foundVer == info.version && foundVal != info.val) ) {    // if the version matches, the val must
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
                 }
               }
             }
@@ -378,6 +1266,68 @@ public class TestRealTimeGet extends Sol
       thread.start();
     }
 
+    int bufferedAddsApplied = 0;
+    do {
+      assertTrue(uLog.getState() == UpdateLog.State.ACTIVE);
+
+      // before we start buffering updates, we want to point
+      // visibleModel away from the live model.
+
+      visibleModel = new ConcurrentHashMap<Integer, DocInfo>(model);
+
+      synchronized (stateChangeLock) {
+        uLog.bufferUpdates();
+      }
+
+      assertTrue(uLog.getState() == UpdateLog.State.BUFFERING);
+
+      // sometimes wait for a second to allow time for writers to write something
+      if (random.nextBoolean()) Thread.sleep(random.nextInt(10)+1);
+
+      Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
+      if (recoveryInfoF != null) {
+        UpdateLog.RecoveryInfo recInfo = null;
+
+        int writeThreadNumber = 0;
+        while (recInfo == null) {
+          try {
+            // wait a short period of time for recovery to complete (and to give a chance for more writers to concurrently add docs)
+            recInfo = recoveryInfoF.get(random.nextInt(100/nWriteThreads), TimeUnit.MILLISECONDS);
+          } catch (TimeoutException e) {
+            // idle one more write thread
+            verbose("Operation",operations.get(),"Draining permits for write thread",writeThreadNumber);
+            writePermissions[writeThreadNumber++].drainPermits();
+            if (writeThreadNumber >= nWriteThreads) {
+              // if we hit the end, back up and give a few write permits
+              writeThreadNumber--;
+              writePermissions[writeThreadNumber].release(random.nextInt(2) + 1);
+            }
+
+            // throttle readers so they don't steal too much CPU from the recovery thread
+            readPermission.drainPermits();
+          }
+        }
+
+        bufferedAddsApplied += recInfo.adds;
+      }
+
+      // put all writers back at full blast
+      for (Semaphore writePerm : writePermissions) {
+        // I don't think semaphores check for overflow, so we need to check mow many remain
+        int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
+        if (neededPermits > 0) writePerm.release( neededPermits );
+      }
+
+      // put back readers at full blast and point back to live model
+      visibleModel = model;
+      int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
+      if (neededPermits > 0) readPermission.release( neededPermits );
+
+      verbose("ROUND=",operations.get());
+    } while (operations.decrementAndGet() > 0);
+
+    verbose("bufferedAddsApplied=",bufferedAddsApplied);
+
     for (Thread thread : threads) {
       thread.join();
     }
@@ -387,6 +1337,10 @@ public class TestRealTimeGet extends Sol
 
 
 
+
+
+
+
   // The purpose of this test is to roughly model how solr uses lucene
   IndexReader reader;
   @Test
@@ -394,13 +1348,13 @@ public class TestRealTimeGet extends Sol
     final int commitPercent = 5 + random.nextInt(20);
     final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
     final int deletePercent = 4+random.nextInt(25);
-    final int deleteByQueryPercent = 0;  // real-time get isn't currently supported with delete-by-query
+    final int deleteByQueryPercent = 1+random.nextInt(5);
     final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
     int nWriteThreads = 5 + random.nextInt(25);
 
     final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
 
-    final AtomicLong operations = new AtomicLong(10000);  // number of query operations to perform in total - crank up if
+    final AtomicLong operations = new AtomicLong(1000);  // number of query operations to perform in total - crank up if
     int nReadThreads = 5 + random.nextInt(25);
     final boolean tombstones = random.nextBoolean();
     final boolean syncCommits = random.nextBoolean();
@@ -468,7 +1422,7 @@ public class TestRealTimeGet extends Sol
 
               if (oper < commitPercent) {
                 if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
-                  Map<Integer,Long> newCommittedModel;
+                  Map<Integer,DocInfo> newCommittedModel;
                   long version;
                   IndexReader oldReader;
 
@@ -485,7 +1439,7 @@ public class TestRealTimeGet extends Sol
                   if (reopenLock != null) reopenLock.lock();
 
                   synchronized(TestRealTimeGet.this) {
-                    newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
+                    newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
                     version = snapshotCount++;
                     oldReader = reader;
                     oldReader.incRef();  // increment the reference since we will use this for reopening
@@ -562,7 +1516,8 @@ public class TestRealTimeGet extends Sol
               // We can't concurrently update the same document and retain our invariants of increasing values
               // since we can't guarantee what order the updates will be executed.
               synchronized (sync) {
-                Long val = model.get(id);
+                DocInfo info = model.get(id);
+                long val = info.val;
                 long nextVal = Math.abs(val)+1;
 
                 if (oper < commitPercent + deletePercent) {
@@ -577,7 +1532,7 @@ public class TestRealTimeGet extends Sol
 
                   verbose("deleting id",id,"val=",nextVal);
                   writer.deleteDocuments(new Term("id",Integer.toString(id)));
-                  model.put(id, -nextVal);
+                  model.put(id, new DocInfo(0,-nextVal));
                   verbose("deleting id",id,"val=",nextVal,"DONE");
 
                 } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
@@ -594,7 +1549,7 @@ public class TestRealTimeGet extends Sol
 
                   verbose("deleteByQuery",id,"val=",nextVal);
                   writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
-                  model.put(id, -nextVal);
+                  model.put(id, new DocInfo(0,-nextVal));
                   verbose("deleteByQuery",id,"val=",nextVal,"DONE");
                 } else {
                   // model.put(id, nextVal);   // uncomment this and this test should fail.
@@ -612,7 +1567,7 @@ public class TestRealTimeGet extends Sol
                     verbose("deleting tombstone for id",id,"DONE");
                   }
 
-                  model.put(id, nextVal);
+                  model.put(id, new DocInfo(0,nextVal));
                   verbose("adding id",id,"val=",nextVal,"DONE");
                 }
               }
@@ -645,12 +1600,11 @@ public class TestRealTimeGet extends Sol
               // when indexing, we update the index, then the model
               // so when querying, we should first check the model, and then the index
 
-              long val;
-
+              DocInfo info;
               synchronized(TestRealTimeGet.this) {
-                val = committedModel.get(id);
+                info = committedModel.get(id);
               }
-
+              long val = info.val;
 
               IndexReader r;
               synchronized(TestRealTimeGet.this) {
@@ -728,7 +1682,7 @@ public class TestRealTimeGet extends Sol
     if (!termsEnum.seekExact(termBytes, false)) {
       return -1;
     }
-    DocsEnum docs = _TestUtil.docs(random, termsEnum, MultiFields.getLiveDocs(r), null, false);
+    DocsEnum docs = termsEnum.docs(MultiFields.getLiveDocs(r), null, false);
     int id = docs.nextDoc();
     if (id != DocIdSetIterator.NO_MORE_DOCS) {
       int next = docs.nextDoc();

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java Wed Jan 25 20:32:44 2012
@@ -258,5 +258,85 @@ public class AutoCommitTest extends Abst
 
     assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
   }
+  
+  public void testCommitWithin() throws Exception {
+    SolrCore core = h.getCore();
+    NewSearcherListener trigger = new NewSearcherListener();    
+    core.registerNewSearcherListener(trigger);
+    DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler();
+    CommitTracker tracker = updater.commitTracker;
+    tracker.setTimeUpperBound(0);
+    tracker.setDocsUpperBound(-1);
+    
+    XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
+    handler.init( null );
+    
+    MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
+    
+    // Add a single document with commitWithin == 1 second
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
+    req.setContentStreams( toContentStreams(
+      adoc(1000, "id", "529", "field_t", "what's inside?", "subject", "info"), null ) );
+    trigger.reset();
+    handler.handleRequest( req, rsp );
+
+    // Check it isn't in the index
+    assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
+    
+    // Wait longer than the commitWithin time
+    assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+
+    // Add one document without commitWithin
+    req.setContentStreams( toContentStreams(
+        adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) );
+      trigger.reset();
+      handler.handleRequest( req, rsp );
+      
+    // Check it isn't in the index
+    assertQ("shouldn't find any", req("id:530") ,"//result[@numFound=0]" );
+    
+    // Delete one document with commitWithin
+    req.setContentStreams( toContentStreams(
+      delI("529", "commitWithin", "1000"), null ) );
+    trigger.reset();
+    handler.handleRequest( req, rsp );
+      
+    // Now make sure we can find it
+    assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
+    
+    // Wait for the commit to happen
+    assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+    
+    // Now we shouldn't find it
+    assertQ("should find none", req("id:529") ,"//result[@numFound=0]" );
+    // ... but we should find the new one
+    assertQ("should find one", req("id:530") ,"//result[@numFound=1]" );
+    
+    trigger.reset();
+    
+    // now make the call 10 times really fast and make sure it 
+    // only commits once
+    req.setContentStreams( toContentStreams(
+        adoc(1000, "id", "500" ), null ) );
+    for( int i=0;i<10; i++ ) {
+      handler.handleRequest( req, rsp );
+    }
+    assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
+    
+    // the same for the delete
+    req.setContentStreams( toContentStreams(
+        delI("530", "commitWithin", "1000"), null ) );
+    for( int i=0;i<10; i++ ) {
+      handler.handleRequest( req, rsp );
+    }
+    assertQ("should be there", req("id:530") ,"//result[@numFound=1]" );
+    
+    assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+    assertQ("should be there", req("id:500") ,"//result[@numFound=1]" );
+    assertQ("should not be there", req("id:530") ,"//result[@numFound=0]" );
+    
+    assertEquals(3, tracker.getCommitCount());
+  }
 
 }

Modified: lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java (original)
+++ lucene/dev/branches/lucene3661/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java Wed Jan 25 20:32:44 2012
@@ -96,8 +96,8 @@ public class SoftAutoCommitTest extends 
     assertNotNull("soft529 wasn't fast enough", soft529);
     monitor.assertSaneOffers();
 
-    // check for the searcher, should have happend right after soft commit
-    Long searcher529 = monitor.searcher.poll(softCommitWaitMillis, MILLISECONDS);
+    // check for the searcher, should have happened right after soft commit
+    Long searcher529 = monitor.searcher.poll(softCommitWaitMillis * 3, MILLISECONDS);
     assertNotNull("searcher529 wasn't fast enough", searcher529);
     monitor.assertSaneOffers();
 
@@ -118,7 +118,7 @@ public class SoftAutoCommitTest extends 
 
     // however slow the machine was to do the soft commit compared to expected,
     // assume newSearcher had some magnitude of that much overhead as well 
-    long slowTestFudge = Math.max(100, 6 * (soft529 - add529 - softCommitWaitMillis));
+    long slowTestFudge = Math.max(200, 12 * (soft529 - add529 - softCommitWaitMillis));
     assertTrue("searcher529 wasn't soon enough after soft529: " +
                searcher529 + " !< " + soft529 + " + " + slowTestFudge + " (fudge)",
                searcher529 < soft529 + slowTestFudge );
@@ -197,10 +197,10 @@ public class SoftAutoCommitTest extends 
 
     assertNotNull("manCommit wasn't fast enough", manCommit);
     assertTrue("forced manCommit didn't happen when it should have: " + 
-        manCommit + " !< " + postAdd529, 
-        manCommit < postAdd529);
+        manCommit + " !<= " + postAdd529, 
+        manCommit <= postAdd529);
     
-    Long hard529 = monitor.hard.poll(hardCommitWaitMillis, MILLISECONDS);
+    Long hard529 = monitor.hard.poll(hardCommitWaitMillis * 2, MILLISECONDS);
     assertNotNull("hard529 wasn't fast enough", hard529);
 
     monitor.assertSaneOffers();

Modified: lucene/dev/branches/lucene3661/solr/example/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/example/solr/conf/schema.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/example/solr/conf/schema.xml (original)
+++ lucene/dev/branches/lucene3661/solr/example/solr/conf/schema.xml Wed Jan 25 20:32:44 2012
@@ -532,6 +532,9 @@
 
    <field name="payloads" type="payloads" indexed="true" stored="true"/>
 
+
+   <field name="_version_" type="long" indexed="true" stored="true"/>
+
    <!-- Uncommenting the following will create a "timestamp" field using
         a default value of "NOW" to indicate when each document was indexed.
      -->

Modified: lucene/dev/branches/lucene3661/solr/example/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/example/solr/conf/solrconfig.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/example/solr/conf/solrconfig.xml (original)
+++ lucene/dev/branches/lucene3661/solr/example/solr/conf/solrconfig.xml Wed Jan 25 20:32:44 2012
@@ -344,11 +344,9 @@
     <!-- Enables a transaction log, currently used for real-time get.
          "dir" - the target directory for transaction logs, defaults to the
             solr data directory.  --> 
-    <!--
-    <updateLog class="solr.FSUpdateLog">
+    <updateLog>
       <str name="dir">${solr.data.dir:}</str>
     </updateLog>
-    -->
    
 
   </updateHandler>
@@ -1072,6 +1070,13 @@
        </lst>
      </requestHandler>
     -->
+    
+    <!-- Solr Replication for SolrCloud Recovery
+    
+         This is the config need for SolrCloud's recovery replication.
+    -->
+	<requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" /> 
+
 
   <!-- Search Components
 

Modified: lucene/dev/branches/lucene3661/solr/example/solr/solr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/example/solr/solr.xml?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/example/solr/solr.xml (original)
+++ lucene/dev/branches/lucene3661/solr/example/solr/solr.xml Wed Jan 25 20:32:44 2012
@@ -28,7 +28,7 @@
   adminPath: RequestHandler path to manage cores.  
     If 'null' (or absent), cores will not be manageable via request handler
   -->
-  <cores adminPath="/admin/cores" defaultCoreName="collection1">
-    <core name="collection1" instanceDir="." shard="shard1"/>
+  <cores adminPath="/admin/cores" defaultCoreName="collection1" host="${host:}" hostPort="${jetty.port:}">
+    <core name="collection1" instanceDir="." />
   </cores>
 </solr>

Modified: lucene/dev/branches/lucene3661/solr/lib/apache-solr-noggit-pom.xml.template
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/lib/apache-solr-noggit-pom.xml.template?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/lib/apache-solr-noggit-pom.xml.template (original)
+++ lucene/dev/branches/lucene3661/solr/lib/apache-solr-noggit-pom.xml.template Wed Jan 25 20:32:44 2012
@@ -31,6 +31,6 @@
   <artifactId>solr-noggit</artifactId>
   <name>Solr Specific Noggit</name>
   <version>@version@</version>
-  <description>Solr Specific Noggit r1209632</description>
+  <description>Solr Specific Noggit r1211150</description>
   <packaging>jar</packaging>
 </project>

Modified: lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java (original)
+++ lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java Wed Jan 25 20:32:44 2012
@@ -226,7 +226,22 @@ public abstract class SolrServer impleme
    * @throws IOException
    */
   public UpdateResponse deleteById(String id) throws SolrServerException, IOException {
-    return new UpdateRequest().deleteById( id ).process( this );
+    return deleteById(id, -1);
+  }
+
+  /**
+   * Deletes a single document by unique ID, specifying max time before commit
+   * @param id  the ID of the document to delete
+   * @param commitWithinMs  max time (in ms) before a commit will happen 
+   * @throws SolrServerException
+   * @throws IOException
+   * @since 3.6
+   */
+  public UpdateResponse deleteById(String id, int commitWithinMs) throws SolrServerException, IOException {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteById(id);
+    req.setCommitWithin(commitWithinMs);
+    return req.process(this);
   }
 
   /**
@@ -236,7 +251,22 @@ public abstract class SolrServer impleme
    * @throws IOException
    */
   public UpdateResponse deleteById(List<String> ids) throws SolrServerException, IOException {
-    return new UpdateRequest().deleteById( ids ).process( this );
+    return deleteById(ids, -1);
+  }
+
+  /**
+   * Deletes a list of documents by unique ID, specifying max time before commit
+   * @param ids  the list of document IDs to delete 
+   * @param commitWithinMs  max time (in ms) before a commit will happen 
+   * @throws SolrServerException
+   * @throws IOException
+   * @since 3.6
+   */
+  public UpdateResponse deleteById(List<String> ids, int commitWithinMs) throws SolrServerException, IOException {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteById(ids);
+    req.setCommitWithin(commitWithinMs);
+    return req.process(this);
   }
 
   /**
@@ -246,7 +276,22 @@ public abstract class SolrServer impleme
    * @throws IOException
    */
   public UpdateResponse deleteByQuery(String query) throws SolrServerException, IOException {
-    return new UpdateRequest().deleteByQuery( query ).process( this );
+    return deleteByQuery(query, -1);
+  }
+
+  /**
+   * Deletes documents from the index based on a query, specifying max time before commit
+   * @param query  the query expressing what documents to delete
+   * @param commitWithinMs  max time (in ms) before a commit will happen 
+   * @throws SolrServerException
+   * @throws IOException
+   * @since 3.6
+   */
+  public UpdateResponse deleteByQuery(String query, int commitWithinMs) throws SolrServerException, IOException {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteByQuery(query);
+    req.setCommitWithin(commitWithinMs);
+    return req.process(this);
   }
 
   /**

Modified: lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Jan 25 20:32:44 2012
@@ -28,16 +28,23 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.KeeperException;
 
 public class CloudSolrServer extends SolrServer {
@@ -48,12 +55,14 @@ public class CloudSolrServer extends Sol
   private String defaultCollection;
   private LBHttpSolrServer lbServer;
   Random rand = new Random();
-
+  private MultiThreadedHttpConnectionManager connManager;
   /**
    * @param zkHost The address of the zookeeper quorum containing the cloud state
    */
   public CloudSolrServer(String zkHost) throws MalformedURLException {
-      this(zkHost, new LBHttpSolrServer());
+      connManager = new MultiThreadedHttpConnectionManager();
+      this.zkHost = zkHost;
+      this.lbServer = new LBHttpSolrServer(new HttpClient(connManager));
   }
 
   /**
@@ -88,42 +97,58 @@ public class CloudSolrServer extends Sol
    * @throws InterruptedException
    */
   public void connect() {
-    if (zkStateReader != null) return;
-    synchronized(this) {
-      if (zkStateReader != null) return;
-      try {
-        ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
-        zk.makeCollectionsNodeWatches();
-        zk.makeShardZkNodeWatches(false);
-        zk.updateCloudState(true);
-        zkStateReader = zk;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-      } catch (KeeperException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
-      } catch (IOException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
-      } catch (TimeoutException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    if (zkStateReader == null) {
+      synchronized (this) {
+        if (zkStateReader == null) {
+          try {
+            ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
+                zkClientTimeout);
+            zk.createClusterStateWatchersAndUpdate();
+            zkStateReader = zk;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (KeeperException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (IOException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          } catch (TimeoutException e) {
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
+          }
+        }
       }
     }
   }
 
-
   @Override
   public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
     connect();
 
+    // TODO: if you can hash here, you could favor the shard leader
+    
     CloudState cloudState = zkStateReader.getCloudState();
 
-    String collection = request.getParams().get("collection", defaultCollection);
-
-    // TODO: allow multiple collections to be specified via comma separated list
+    SolrParams reqParams = request.getParams();
+    if (reqParams == null) {
+      reqParams = new ModifiableSolrParams();
+    }
+    String collection = reqParams.get("collection", defaultCollection);
+    
+    // Extract each comma separated collection name and store in a List.
+    List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
+    
+    // Retrieve slices from the cloud state and, for each collection specified,
+    // add it to the Map of slices.
+    Map<String,Slice> slices = new HashMap<String,Slice>();
+    for (int i = 0; i < collectionList.size(); i++) {
+      String coll= collectionList.get(i);
+      ClientUtils.appendMap(coll, slices, cloudState.getSlices(coll));
+    }
 
-    Map<String,Slice> slices = cloudState.getSlices(collection);
     Set<String> liveNodes = cloudState.getLiveNodes();
 
     // IDEA: have versions on various things... like a global cloudState version
@@ -136,18 +161,21 @@ public class CloudSolrServer extends Sol
     List<String> urlList = new ArrayList<String>();
     for (Slice slice : slices.values()) {
       for (ZkNodeProps nodeProps : slice.getShards().values()) {
-        String node = nodeProps.get(ZkStateReader.NODE_NAME);
-        if (!liveNodes.contains(node)) continue;
+        ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+        String node = coreNodeProps.getNodeName();
+        if (!liveNodes.contains(coreNodeProps.getNodeName())
+            || !coreNodeProps.getState().equals(
+                ZkStateReader.ACTIVE)) continue;
         if (nodes.put(node, nodeProps) == null) {
-          String url = nodeProps.get(ZkStateReader.URL_PROP);
+          String url = coreNodeProps.getCoreUrl();
           urlList.add(url);
         }
       }
     }
 
     Collections.shuffle(urlList, rand);
-    // System.out.println("########################## MAKING REQUEST TO " + urlList);
-    // TODO: set distrib=true if we detected more than one shard?
+    //System.out.println("########################## MAKING REQUEST TO " + urlList);
+ 
     LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
     LBHttpSolrServer.Rsp rsp = lbServer.request(req);
     return rsp.getResponse();
@@ -161,5 +189,12 @@ public class CloudSolrServer extends Sol
         zkStateReader = null;
       }
     }
+    if (connManager != null) {
+      connManager.shutdown();
+    }
+  }
+
+  public LBHttpSolrServer getLbServer() {
+    return lbServer;
   }
 }

Modified: lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java Wed Jan 25 20:32:44 2012
@@ -139,6 +139,8 @@ public class CommonsHttpSolrServer exten
    * with single-part requests.
    */
   private boolean useMultiPartPost;
+
+  private boolean shutdownHttpClient = false;
   
   /**  
    * @param solrServerUrl The URL of the Solr server.  For 
@@ -204,6 +206,7 @@ public class CommonsHttpSolrServer exten
     }
 
     if (client == null) {
+      shutdownHttpClient  = true;
       _httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()) ;
 
       // prevent retries  (note: this didn't work when set on mgr.. needed to be set on client)
@@ -669,4 +672,12 @@ public class CommonsHttpSolrServer exten
     req.setCommitWithin(commitWithinMs);
     return req.process(this);
   }
+  
+  public void shutdown() {
+    if (shutdownHttpClient && _httpClient != null
+        && _httpClient.getHttpConnectionManager() instanceof MultiThreadedHttpConnectionManager) {
+      ((MultiThreadedHttpConnectionManager) _httpClient
+          .getHttpConnectionManager()).shutdown();
+    }
+  }
 }

Modified: lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Wed Jan 25 20:32:44 2012
@@ -28,6 +28,8 @@ import org.apache.solr.common.SolrExcept
 import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -249,16 +251,26 @@ public class LBHttpSolrServer extends So
         rsp.rsp = server.request(req.getRequest());
         return rsp; // SUCCESS
       } catch (SolrException e) {
-        // Server is alive but the request was malformed or invalid
-        throw e;
+        // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+        if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+          ex = addZombie(server, e);
+        } else {
+          // Server is alive but the request was likely malformed or invalid
+          throw e;
+        }
+       
+       // TODO: consider using below above - currently does cause a problem with distrib updates:
+       // seems to match up against a failed forward to leader exception as well...
+       //     || e.getMessage().contains("java.net.SocketException")
+       //     || e.getMessage().contains("java.net.ConnectException")
+      } catch (SocketException e) {
+        ex = addZombie(server, e);
+      } catch (SocketTimeoutException e) {
+        ex = addZombie(server, e);
       } catch (SolrServerException e) {
-        if (e.getRootCause() instanceof IOException) {
-          ex = e;
-          wrapper = new ServerWrapper(server);
-          wrapper.lastUsed = System.currentTimeMillis();
-          wrapper.standard = false;
-          zombieServers.put(wrapper.getKey(), wrapper);
-          startAliveCheckExecutor();
+        Throwable rootCause = e.getRootCause();
+        if (rootCause instanceof IOException) {
+          ex = addZombie(server, e);
         } else {
           throw e;
         }
@@ -274,11 +286,23 @@ public class LBHttpSolrServer extends So
         zombieServers.remove(wrapper.getKey());
         return rsp; // SUCCESS
       } catch (SolrException e) {
-        // Server is alive but the request was malformed or invalid
-        zombieServers.remove(wrapper.getKey());
-        throw e;
+        // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+        if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+          ex = e;
+          // already a zombie, no need to re-add
+        } else {
+          // Server is alive but the request was malformed or invalid
+          zombieServers.remove(wrapper.getKey());
+          throw e;
+        }
+
+      } catch (SocketException e) {
+        ex = e;
+      } catch (SocketTimeoutException e) {
+        ex = e;
       } catch (SolrServerException e) {
-        if (e.getRootCause() instanceof IOException) {
+        Throwable rootCause = e.getRootCause();
+        if (rootCause instanceof IOException) {
           ex = e;
           // already a zombie, no need to re-add
         } else {
@@ -293,9 +317,22 @@ public class LBHttpSolrServer extends So
     if (ex == null) {
       throw new SolrServerException("No live SolrServers available to handle this request");
     } else {
-      throw new SolrServerException("No live SolrServers available to handle this request", ex);
+      throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
     }
 
+  }
+
+  private Exception addZombie(CommonsHttpSolrServer server,
+      Exception e) {
+
+    ServerWrapper wrapper;
+
+    wrapper = new ServerWrapper(server);
+    wrapper.lastUsed = System.currentTimeMillis();
+    wrapper.standard = false;
+    zombieServers.put(wrapper.getKey(), wrapper);
+    startAliveCheckExecutor();
+    return e;
   }  
 
 
@@ -363,6 +400,12 @@ public class LBHttpSolrServer extends So
   public void setSoTimeout(int timeout) {
     httpClient.getParams().setSoTimeout(timeout);
   }
+  
+  public void shutdown() {
+    if (aliveCheckExecutor != null) {
+      aliveCheckExecutor.shutdownNow();
+    }
+  }
 
   /**
    * Tries to query a live server. A SolrServerException is thrown if all servers are dead.

Modified: lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=1235919&r1=1235918&r2=1235919&view=diff
==============================================================================
--- lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/dev/branches/lucene3661/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Wed Jan 25 20:32:44 2012
@@ -20,8 +20,14 @@ package org.apache.solr.client.solrj.imp
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -31,13 +37,10 @@ import org.apache.commons.httpclient.met
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.NamedList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -300,4 +303,33 @@ public class StreamingUpdateSolrServer e
   {
     log.error( "error", ex );
   }
+  
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    scheduler.shutdown();
+    try {
+      if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+        scheduler.shutdownNow();
+        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+            .error("ExecutorService did not terminate");
+      }
+    } catch (InterruptedException ie) {
+      scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+  
+  
+  public void shutdownNow() {
+    super.shutdown();
+    scheduler.shutdownNow(); // Cancel currently executing tasks
+    try {
+      if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+          .error("ExecutorService did not terminate");
+    } catch (InterruptedException ie) {
+      scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
 }