You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2012/01/25 22:56:51 UTC
svn commit: r1235969 [7/9] - in /lucene/dev/branches/lucene2858: ./
dev-tools/eclipse/ dev-tools/idea/lucene/contrib/ dev-tools/maven/ lucene/
lucene/contrib/
lucene/contrib/sandbox/src/test/org/apache/lucene/sandbox/queries/regex/
lucene/src/java/org/...
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Wed Jan 25 21:56:44 2012
@@ -17,14 +17,6 @@
package org.apache.solr.search;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@@ -33,17 +25,32 @@ import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util._TestUtil;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import static org.apache.solr.core.SolrCore.verbose;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
public class TestRealTimeGet extends SolrTestCaseJ4 {
+ private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
+
@BeforeClass
public static void beforeClass() throws Exception {
@@ -59,10 +66,26 @@ public class TestRealTimeGet extends Sol
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
- assertJQ(req("qt","/get","id","1")
+ assertJQ(req("qt","/get", "id","1", "fl","id")
,"=={'doc':{'id':'1'}}"
);
- assertJQ(req("qt","/get","ids","1")
+ assertJQ(req("qt","/get","ids","1", "fl","id")
+ ,"=={" +
+ " 'response':{'numFound':1,'start':0,'docs':[" +
+ " {" +
+ " 'id':'1'}]" +
+ " }}}"
+ );
+
+ assertU(commit());
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==1"
+ );
+ assertJQ(req("qt","/get","id","1", "fl","id")
+ ,"=={'doc':{'id':'1'}}"
+ );
+ assertJQ(req("qt","/get","ids","1", "fl","id")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
@@ -70,127 +93,939 @@ public class TestRealTimeGet extends Sol
" }}}"
);
- assertU(commit());
+ assertU(delI("1"));
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==1"
+ );
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+ assertJQ(req("qt","/get","ids","1")
+ ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
+ );
+
+
+ assertU(adoc("id","10"));
+ assertU(adoc("id","11"));
+ assertJQ(req("qt","/get","id","10", "fl","id")
+ ,"=={'doc':{'id':'10'}}"
+ );
+ assertU(delQ("id:10 abcdef"));
+ assertJQ(req("qt","/get","id","10")
+ ,"=={'doc':null}"
+ );
+ assertJQ(req("qt","/get","id","11", "fl","id")
+ ,"=={'doc':{'id':'11'}}"
+ );
+
+
+ }
+
+
+ @Test
+ public void testVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ long version = addAndGetVersion(sdoc("id","1") , null);
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==0"
+ );
+
+ // test version is there from rtg
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // test version is there from the index
+ assertU(commit());
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate an update from the leader
+ version += 10;
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test version is there from rtg
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a version less than that does not take affect
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a delete w/ version less than that does not take affect
+ // TODO: also allow passing version on delete instead of on URL?
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // make sure reordering detection also works after a commit
+ assertU(commit());
+
+ // simulate reordering: test that a version less than that does not take affect
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a delete w/ version less than that does not take affect
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // now simulate a normal delete from the leader
+ version += 5;
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version)));
+
+ // make sure a reordered add doesn't take affect.
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that it's still deleted
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+
+ // test that we can remember the version of a delete after a commit
+ assertU(commit());
+
+ // make sure a reordered add doesn't take affect.
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that it's still deleted
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+
+ version = addAndGetVersion(sdoc("id","2"), null);
+ long version2 = deleteByQueryAndGetVersion("id:2", null);
+ assertTrue(Math.abs(version2) > version );
+
+ }
+
+
+
+ /***
+ @Test
+ public void testGetRealtime() throws Exception {
+ SolrQueryRequest sr1 = req("q","foo");
+ IndexReader r1 = sr1.getCore().getRealtimeReader();
+
+ assertU(adoc("id","1"));
+
+ IndexReader r2 = sr1.getCore().getRealtimeReader();
+ assertNotSame(r1, r2);
+ int refcount = r2.getRefCount();
+
+ // make sure a new reader wasn't opened
+ IndexReader r3 = sr1.getCore().getRealtimeReader();
+ assertSame(r2, r3);
+ assertEquals(refcount+1, r3.getRefCount());
+
+ assertU(commit());
+
+ // this is not critical, but currently a commit does not refresh the reader
+ // if nothing has changed
+ IndexReader r4 = sr1.getCore().getRealtimeReader();
+ assertEquals(refcount+2, r4.getRefCount());
+
+
+ r1.decRef();
+ r2.decRef();
+ r3.decRef();
+ r4.decRef();
+ sr1.close();
+ }
+ ***/
+
+
+ final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+ Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
+ long snapshotCount;
+ long committedModelClock;
+ volatile int lastId;
+ final String field = "val_l";
+ Object[] syncArr;
+
+ private void initModel(int ndocs) {
+ snapshotCount = 0;
+ committedModelClock = 0;
+ lastId = 0;
+
+ syncArr = new Object[ndocs];
+
+ for (int i=0; i<ndocs; i++) {
+ model.put(i, new DocInfo(0, -1L));
+ syncArr[i] = new Object();
+ }
+ committedModel.putAll(model);
+ }
+
+
+ static class DocInfo {
+ long version;
+ long val;
+
+ public DocInfo(long version, long val) {
+ this.version = version;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "{version="+version+",val="+val+"\"";
+ }
+ }
+
+ @Test
+ public void testStressGetRealtime() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 1+random.nextInt(5);
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 60;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+
+ verbose("commitPercent=", commitPercent);
+ verbose("softCommitPercent=",softCommitPercent);
+ verbose("deletePercent=",deletePercent);
+ verbose("deleteByQueryPercent=", deleteByQueryPercent);
+ verbose("ndocs=", ndocs);
+ verbose("nWriteThreads=", nWriteThreads);
+ verbose("nReadThreads=", nReadThreads);
+ verbose("percentRealtimeQuery=", percentRealtimeQuery);
+ verbose("maxConcurrentCommits=", maxConcurrentCommits);
+ verbose("operations=", operations);
+
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ verbose("took snapshot version=",version);
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ if (VERBOSE) {
+ verbose("deleting id",id,"val=",nextVal);
+ }
+
+ // assertU("<delete><id>" + id + "</id></delete>");
+ Long version = deleteAndGetVersion(Integer.toString(id), null);
+
+ model.put(id, new DocInfo(version, -nextVal));
+ if (VERBOSE) {
+ verbose("deleting id", id, "val=",nextVal,"DONE");
+ }
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ if (VERBOSE) {
+ verbose("deleteByQuery id ",id, "val=",nextVal);
+ }
+
+ assertU("<delete><query>id:" + id + "</query></delete>");
+ model.put(id, new DocInfo(-1L, -nextVal));
+ if (VERBOSE) {
+ verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
+ }
+ } else {
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal);
+ }
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+ model.put(id, new DocInfo(version, nextVal));
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"DONE");
+ }
+
+ }
+ } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVal < Math.abs(info.val)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+
+ // This version doesn't synchronize on id to tell what update won, but instead uses versions
+ @Test
+ public void testStressGetRealtimeVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 1 + random.nextInt(5);
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ //
+ // NOTE: versioning means we can now remove the sync and tell what update "won"
+ // synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ verbose("deleting id",id,"val=",nextVal);
+
+ Long version = deleteAndGetVersion(Integer.toString(id), null);
+ assertTrue(version < 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleting id", id, "val=",nextVal,"DONE");
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ verbose("deleteByQyery id",id,"val=",nextVal);
+
+ Long version = deleteByQueryAndGetVersion("id:"+Integer.toString(id), null);
+ assertTrue(version < 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleteByQyery id", id, "val=",nextVal,"DONE");
+ } else {
+ verbose("adding id", id, "val=", nextVal);
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+ assertTrue(version > 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"DONE");
+ }
+
+ }
+ // } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+ // This version simulates updates coming from the leader and sometimes being reordered
+ @Test
+ public void testStressReorderVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 0; // delete-by-query can't be reordered on replicas
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+
+ final AtomicLong testVersion = new AtomicLong(0);
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ DocInfo info = model.get(id);
- assertJQ(req("q","id:1")
- ,"/response/numFound==1"
- );
- assertJQ(req("qt","/get","id","1")
- ,"=={'doc':{'id':'1'}}"
- );
- assertJQ(req("qt","/get","ids","1")
- ,"=={" +
- " 'response':{'numFound':1,'start':0,'docs':[" +
- " {" +
- " 'id':'1'}]" +
- " }}}"
- );
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
- assertU(delI("1"));
+ // the version we set on the update should determine who wins
+ // These versions are not derived from the actual leader update handler hand hence this
+ // test may need to change depending on how we handle version numbers.
+ long version = testVersion.incrementAndGet();
- assertJQ(req("q","id:1")
- ,"/response/numFound==1"
- );
- assertJQ(req("qt","/get","id","1")
- ,"=={'doc':null}"
- );
- assertJQ(req("qt","/get","ids","1")
- ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
- );
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
- }
+ if (oper < commitPercent + deletePercent) {
+ verbose("deleting id",id,"val=",nextVal,"version",version);
+ Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
- /***
- @Test
- public void testGetRealtime() throws Exception {
- SolrQueryRequest sr1 = req("q","foo");
- IndexReader r1 = sr1.getCore().getRealtimeReader();
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
+ }
- assertU(adoc("id","1"));
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
- IndexReader r2 = sr1.getCore().getRealtimeReader();
- assertNotSame(r1, r2);
- int refcount = r2.getRefCount();
+ verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- // make sure a new reader wasn't opened
- IndexReader r3 = sr1.getCore().getRealtimeReader();
- assertSame(r2, r3);
- assertEquals(refcount+1, r3.getRefCount());
+ } else {
+ verbose("adding id", id, "val=", nextVal,"version",version);
- assertU(commit());
+ Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ if (returnedVersion != null) {
+ assertEquals(version, returnedVersion.longValue());
+ }
- // this is not critical, but currently a commit does not refresh the reader
- // if nothing has changed
- IndexReader r4 = sr1.getCore().getRealtimeReader();
- assertEquals(refcount+2, r4.getRefCount());
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+ }
- r1.decRef();
- r2.decRef();
- r3.decRef();
- r4.decRef();
- sr1.close();
- }
- ***/
+ }
+ // } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+ threads.add(thread);
+ }
- final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
- Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
- long snapshotCount;
- long committedModelClock;
- volatile int lastId;
- final String field = "val_l";
- Object[] syncArr;
- private void initModel(int ndocs) {
- snapshotCount = 0;
- committedModelClock = 0;
- lastId = 0;
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
- syncArr = new Object[ndocs];
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
- for (int i=0; i<ndocs; i++) {
- model.put(i, -1L);
- syncArr[i] = new Object();
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
}
- committedModel.putAll(model);
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
}
+ // This points to the live model when state is ACTIVE, but a snapshot of the
+ // past when recovering.
+ volatile ConcurrentHashMap<Integer,DocInfo> visibleModel;
+
+ // This version simulates updates coming from the leader and sometimes being reordered
+ // and tests the ability to buffer updates and apply them later
@Test
- public void testStressGetRealtime() throws Exception {
+ public void testStressRecovery() throws Exception {
clearIndex();
assertU(commit());
- // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
-
- final int commitPercent = 5 + random.nextInt(20);
+ final int commitPercent = 5 + random.nextInt(10);
final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random.nextInt(25);
final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
- int nWriteThreads = 5 + random.nextInt(25);
+ int nWriteThreads = 2 + random.nextInt(10); // fewer write threads to give recovery thread more of a chance
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
- final int percentRealtimeQuery = 60;
- // final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- int nReadThreads = 5 + random.nextInt(25);
-
-
- verbose("commitPercent=", commitPercent);
- verbose("softCommitPercent=",softCommitPercent);
- verbose("deletePercent=",deletePercent);
- verbose("deleteByQueryPercent=", deleteByQueryPercent);
- verbose("ndocs=", ndocs);
- verbose("nWriteThreads=", nWriteThreads);
- verbose("nReadThreads=", nReadThreads);
- verbose("percentRealtimeQuery=", percentRealtimeQuery);
- verbose("maxConcurrentCommits=", maxConcurrentCommits);
- verbose("operations=", operations);
-
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(atLeast(75)); // number of recovery loops to perform
+ int nReadThreads = 2 + random.nextInt(10); // fewer read threads to give writers more of a chance
initModel(ndocs);
@@ -198,39 +1033,63 @@ public class TestRealTimeGet extends Sol
List<Thread> threads = new ArrayList<Thread>();
+
+ final AtomicLong testVersion = new AtomicLong(0);
+
+
+ final UpdateHandler uHandler = h.getCore().getUpdateHandler();
+ final UpdateLog uLog = uHandler.getUpdateLog();
+ final VersionInfo vInfo = uLog.getVersionInfo();
+ final Object stateChangeLock = new Object();
+ this.visibleModel = model;
+ final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
+ for (int i=0; i<nWriteThreads; i++) writePermissions[i] = new Semaphore(Integer.MAX_VALUE, false);
+
+ final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);
+
for (int i=0; i<nWriteThreads; i++) {
+ final int threadNum = i;
+
Thread thread = new Thread("WRITER"+i) {
Random rand = new Random(random.nextInt());
+ Semaphore writePermission = writePermissions[threadNum];
@Override
public void run() {
try {
while (operations.get() > 0) {
- int oper = rand.nextInt(100);
+ writePermission.acquire();
+
+ int oper = rand.nextInt(10);
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
+ Map<Integer,DocInfo> newCommittedModel;
long version;
synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
- verbose("took snapshot version=",version);
}
- if (rand.nextInt(100) < softCommitPercent) {
- verbose("softCommit start");
- assertU(h.commit("softCommit","true"));
- verbose("softCommit end");
- } else {
- verbose("hardCommit start");
- assertU(commit());
- verbose("hardCommit end");
+ synchronized (stateChangeLock) {
+ // These commits won't take affect if we are in recovery mode,
+ // so change the version to -1 so we won't update our model.
+ if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
}
synchronized(TestRealTimeGet.this) {
// install this model snapshot only if it's newer than the current one
+ // install this model only if we are not in recovery mode.
if (version >= committedModelClock) {
if (VERBOSE) {
verbose("installing new committedModel version="+committedModelClock);
@@ -245,8 +1104,13 @@ public class TestRealTimeGet extends Sol
}
- int id = rand.nextInt(ndocs);
- Object sync = syncArr[id];
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
@@ -255,46 +1119,63 @@ public class TestRealTimeGet extends Sol
lastId = id;
}
- // We can't concurrently update the same document and retain our invariants of increasing values
- // since we can't guarantee what order the updates will be executed.
- synchronized (sync) {
- Long val = model.get(id);
- long nextVal = Math.abs(val)+1;
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ // the version we set on the update should determine who wins
+ // These versions are not derived from the actual leader update handler hand hence this
+ // test may need to change depending on how we handle version numbers.
+ long version = testVersion.incrementAndGet();
+
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
if (oper < commitPercent + deletePercent) {
- if (VERBOSE) {
- verbose("deleting id",id,"val=",nextVal);
+ verbose("deleting id",id,"val=",nextVal,"version",version);
+
+ Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
}
- assertU("<delete><id>" + id + "</id></delete>");
- model.put(id, -nextVal);
- if (VERBOSE) {
- verbose("deleting id", id, "val=",nextVal,"DONE");
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
}
+
+ verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- if (VERBOSE) {
- verbose("deleteByQuery id ",id, "val=",nextVal);
- }
- assertU("<delete><query>id:" + id + "</query></delete>");
- model.put(id, -nextVal);
- if (VERBOSE) {
- verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
- }
} else {
- if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal);
+ verbose("adding id", id, "val=", nextVal,"version",version);
+
+ Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ if (returnedVersion != null) {
+ assertEquals(version, returnedVersion.longValue());
}
- assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- model.put(id, nextVal);
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal,"DONE");
+ verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
}
}
- }
+ // } // end sync
if (!before) {
lastId = id;
@@ -319,7 +1200,11 @@ public class TestRealTimeGet extends Sol
@Override
public void run() {
try {
- while (operations.decrementAndGet() >= 0) {
+ while (operations.get() > 0) {
+ // throttle reads (don't completely stop)
+ readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);
+
+
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
@@ -327,17 +1212,18 @@ public class TestRealTimeGet extends Sol
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- long val;
+ DocInfo info;
if (realTime) {
- val = model.get(id);
+ info = visibleModel.get(id);
} else {
synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
+ info = committedModel.get(id);
}
}
- if (VERBOSE) {
+
+ if (VERBOSE) {
verbose("querying id", id);
}
SolrQueryRequest sreq;
@@ -355,9 +1241,11 @@ public class TestRealTimeGet extends Sol
} else {
assertEquals(1, doclist.size());
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- if (foundVal < Math.abs(val)) {
- verbose("ERROR, id", id, "foundVal=",foundVal,"model val=",val,"realTime=",realTime);
- assertTrue(foundVal >= Math.abs(val));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
}
}
}
@@ -378,6 +1266,68 @@ public class TestRealTimeGet extends Sol
thread.start();
}
+ int bufferedAddsApplied = 0;
+ do {
+ assertTrue(uLog.getState() == UpdateLog.State.ACTIVE);
+
+ // before we start buffering updates, we want to point
+ // visibleModel away from the live model.
+
+ visibleModel = new ConcurrentHashMap<Integer, DocInfo>(model);
+
+ synchronized (stateChangeLock) {
+ uLog.bufferUpdates();
+ }
+
+ assertTrue(uLog.getState() == UpdateLog.State.BUFFERING);
+
+ // sometimes wait for a second to allow time for writers to write something
+ if (random.nextBoolean()) Thread.sleep(random.nextInt(10)+1);
+
+ Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
+ if (recoveryInfoF != null) {
+ UpdateLog.RecoveryInfo recInfo = null;
+
+ int writeThreadNumber = 0;
+ while (recInfo == null) {
+ try {
+ // wait a short period of time for recovery to complete (and to give a chance for more writers to concurrently add docs)
+ recInfo = recoveryInfoF.get(random.nextInt(100/nWriteThreads), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // idle one more write thread
+ verbose("Operation",operations.get(),"Draining permits for write thread",writeThreadNumber);
+ writePermissions[writeThreadNumber++].drainPermits();
+ if (writeThreadNumber >= nWriteThreads) {
+ // if we hit the end, back up and give a few write permits
+ writeThreadNumber--;
+ writePermissions[writeThreadNumber].release(random.nextInt(2) + 1);
+ }
+
+ // throttle readers so they don't steal too much CPU from the recovery thread
+ readPermission.drainPermits();
+ }
+ }
+
+ bufferedAddsApplied += recInfo.adds;
+ }
+
+ // put all writers back at full blast
+ for (Semaphore writePerm : writePermissions) {
+ // I don't think semaphores check for overflow, so we need to check mow many remain
+ int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
+ if (neededPermits > 0) writePerm.release( neededPermits );
+ }
+
+ // put back readers at full blast and point back to live model
+ visibleModel = model;
+ int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
+ if (neededPermits > 0) readPermission.release( neededPermits );
+
+ verbose("ROUND=",operations.get());
+ } while (operations.decrementAndGet() > 0);
+
+ verbose("bufferedAddsApplied=",bufferedAddsApplied);
+
for (Thread thread : threads) {
thread.join();
}
@@ -387,6 +1337,10 @@ public class TestRealTimeGet extends Sol
+
+
+
+
// The purpose of this test is to roughly model how solr uses lucene
IndexReader reader;
@Test
@@ -394,13 +1348,13 @@ public class TestRealTimeGet extends Sol
final int commitPercent = 5 + random.nextInt(20);
final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random.nextInt(25);
- final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
+ final int deleteByQueryPercent = 1+random.nextInt(5);
final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
int nWriteThreads = 5 + random.nextInt(25);
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
- final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total - crank up if
+ final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total - crank up if
int nReadThreads = 5 + random.nextInt(25);
final boolean tombstones = random.nextBoolean();
final boolean syncCommits = random.nextBoolean();
@@ -468,7 +1422,7 @@ public class TestRealTimeGet extends Sol
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
+ Map<Integer,DocInfo> newCommittedModel;
long version;
IndexReader oldReader;
@@ -485,7 +1439,7 @@ public class TestRealTimeGet extends Sol
if (reopenLock != null) reopenLock.lock();
synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
oldReader = reader;
oldReader.incRef(); // increment the reference since we will use this for reopening
@@ -562,7 +1516,8 @@ public class TestRealTimeGet extends Sol
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
synchronized (sync) {
- Long val = model.get(id);
+ DocInfo info = model.get(id);
+ long val = info.val;
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
@@ -577,7 +1532,7 @@ public class TestRealTimeGet extends Sol
verbose("deleting id",id,"val=",nextVal);
writer.deleteDocuments(new Term("id",Integer.toString(id)));
- model.put(id, -nextVal);
+ model.put(id, new DocInfo(0,-nextVal));
verbose("deleting id",id,"val=",nextVal,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
@@ -594,7 +1549,7 @@ public class TestRealTimeGet extends Sol
verbose("deleteByQuery",id,"val=",nextVal);
writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
- model.put(id, -nextVal);
+ model.put(id, new DocInfo(0,-nextVal));
verbose("deleteByQuery",id,"val=",nextVal,"DONE");
} else {
// model.put(id, nextVal); // uncomment this and this test should fail.
@@ -612,7 +1567,7 @@ public class TestRealTimeGet extends Sol
verbose("deleting tombstone for id",id,"DONE");
}
- model.put(id, nextVal);
+ model.put(id, new DocInfo(0,nextVal));
verbose("adding id",id,"val=",nextVal,"DONE");
}
}
@@ -645,12 +1600,11 @@ public class TestRealTimeGet extends Sol
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
- long val;
-
+ DocInfo info;
synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
+ info = committedModel.get(id);
}
-
+ long val = info.val;
IndexReader r;
synchronized(TestRealTimeGet.this) {
@@ -728,7 +1682,7 @@ public class TestRealTimeGet extends Sol
if (!termsEnum.seekExact(termBytes, false)) {
return -1;
}
- DocsEnum docs = _TestUtil.docs(random, termsEnum, MultiFields.getLiveDocs(r), null, false);
+ DocsEnum docs = termsEnum.docs(MultiFields.getLiveDocs(r), null, false);
int id = docs.nextDoc();
if (id != DocIdSetIterator.NO_MORE_DOCS) {
int next = docs.nextDoc();
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/AutoCommitTest.java Wed Jan 25 21:56:44 2012
@@ -258,5 +258,85 @@ public class AutoCommitTest extends Abst
assertQ("now it should", req("id:500") ,"//result[@numFound=1]" );
}
+
+ public void testCommitWithin() throws Exception {
+ SolrCore core = h.getCore();
+ NewSearcherListener trigger = new NewSearcherListener();
+ core.registerNewSearcherListener(trigger);
+ DirectUpdateHandler2 updater = (DirectUpdateHandler2) core.getUpdateHandler();
+ CommitTracker tracker = updater.commitTracker;
+ tracker.setTimeUpperBound(0);
+ tracker.setDocsUpperBound(-1);
+
+ XmlUpdateRequestHandler handler = new XmlUpdateRequestHandler();
+ handler.init( null );
+
+ MapSolrParams params = new MapSolrParams( new HashMap<String, String>() );
+
+ // Add a single document with commitWithin == 1 second
+ SolrQueryResponse rsp = new SolrQueryResponse();
+ SolrQueryRequestBase req = new SolrQueryRequestBase( core, params ) {};
+ req.setContentStreams( toContentStreams(
+ adoc(1000, "id", "529", "field_t", "what's inside?", "subject", "info"), null ) );
+ trigger.reset();
+ handler.handleRequest( req, rsp );
+
+ // Check it isn't in the index
+ assertQ("shouldn't find any", req("id:529") ,"//result[@numFound=0]" );
+
+ // Wait longer than the commitWithin time
+ assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+
+ // Add one document without commitWithin
+ req.setContentStreams( toContentStreams(
+ adoc("id", "530", "field_t", "what's inside?", "subject", "info"), null ) );
+ trigger.reset();
+ handler.handleRequest( req, rsp );
+
+ // Check it isn't in the index
+ assertQ("shouldn't find any", req("id:530") ,"//result[@numFound=0]" );
+
+ // Delete one document with commitWithin
+ req.setContentStreams( toContentStreams(
+ delI("529", "commitWithin", "1000"), null ) );
+ trigger.reset();
+ handler.handleRequest( req, rsp );
+
+ // Now make sure we can find it
+ assertQ("should find one", req("id:529") ,"//result[@numFound=1]" );
+
+ // Wait for the commit to happen
+ assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+
+ // Now we shouldn't find it
+ assertQ("should find none", req("id:529") ,"//result[@numFound=0]" );
+ // ... but we should find the new one
+ assertQ("should find one", req("id:530") ,"//result[@numFound=1]" );
+
+ trigger.reset();
+
+ // now make the call 10 times really fast and make sure it
+ // only commits once
+ req.setContentStreams( toContentStreams(
+ adoc(1000, "id", "500" ), null ) );
+ for( int i=0;i<10; i++ ) {
+ handler.handleRequest( req, rsp );
+ }
+ assertQ("should not be there yet", req("id:500") ,"//result[@numFound=0]" );
+
+ // the same for the delete
+ req.setContentStreams( toContentStreams(
+ delI("530", "commitWithin", "1000"), null ) );
+ for( int i=0;i<10; i++ ) {
+ handler.handleRequest( req, rsp );
+ }
+ assertQ("should be there", req("id:530") ,"//result[@numFound=1]" );
+
+ assertTrue("commitWithin failed to commit", trigger.waitForNewSearcher(30000));
+ assertQ("should be there", req("id:500") ,"//result[@numFound=1]" );
+ assertQ("should not be there", req("id:530") ,"//result[@numFound=0]" );
+
+ assertEquals(3, tracker.getCommitCount());
+ }
}
Modified: lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java (original)
+++ lucene/dev/branches/lucene2858/solr/core/src/test/org/apache/solr/update/SoftAutoCommitTest.java Wed Jan 25 21:56:44 2012
@@ -96,8 +96,8 @@ public class SoftAutoCommitTest extends
assertNotNull("soft529 wasn't fast enough", soft529);
monitor.assertSaneOffers();
- // check for the searcher, should have happend right after soft commit
- Long searcher529 = monitor.searcher.poll(softCommitWaitMillis, MILLISECONDS);
+ // check for the searcher, should have happened right after soft commit
+ Long searcher529 = monitor.searcher.poll(softCommitWaitMillis * 3, MILLISECONDS);
assertNotNull("searcher529 wasn't fast enough", searcher529);
monitor.assertSaneOffers();
@@ -118,7 +118,7 @@ public class SoftAutoCommitTest extends
// however slow the machine was to do the soft commit compared to expected,
// assume newSearcher had some magnitude of that much overhead as well
- long slowTestFudge = Math.max(100, 6 * (soft529 - add529 - softCommitWaitMillis));
+ long slowTestFudge = Math.max(200, 12 * (soft529 - add529 - softCommitWaitMillis));
assertTrue("searcher529 wasn't soon enough after soft529: " +
searcher529 + " !< " + soft529 + " + " + slowTestFudge + " (fudge)",
searcher529 < soft529 + slowTestFudge );
@@ -191,16 +191,16 @@ public class SoftAutoCommitTest extends
long postAdd529 = System.currentTimeMillis();
// wait for first hard/soft commit
- Long soft529 = monitor.soft.poll(softCommitWaitMillis * 2, MILLISECONDS);
+ Long soft529 = monitor.soft.poll(softCommitWaitMillis * 3, MILLISECONDS);
assertNotNull("soft529 wasn't fast enough", soft529);
Long manCommit = monitor.hard.poll(0, MILLISECONDS);
assertNotNull("manCommit wasn't fast enough", manCommit);
assertTrue("forced manCommit didn't happen when it should have: " +
- manCommit + " !< " + postAdd529,
- manCommit < postAdd529);
+ manCommit + " !<= " + postAdd529,
+ manCommit <= postAdd529);
- Long hard529 = monitor.hard.poll(hardCommitWaitMillis, MILLISECONDS);
+ Long hard529 = monitor.hard.poll(hardCommitWaitMillis * 2, MILLISECONDS);
assertNotNull("hard529 wasn't fast enough", hard529);
monitor.assertSaneOffers();
Modified: lucene/dev/branches/lucene2858/solr/example/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/example/solr/conf/schema.xml?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/example/solr/conf/schema.xml (original)
+++ lucene/dev/branches/lucene2858/solr/example/solr/conf/schema.xml Wed Jan 25 21:56:44 2012
@@ -532,6 +532,9 @@
<field name="payloads" type="payloads" indexed="true" stored="true"/>
+
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+
<!-- Uncommenting the following will create a "timestamp" field using
a default value of "NOW" to indicate when each document was indexed.
-->
Modified: lucene/dev/branches/lucene2858/solr/example/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/example/solr/conf/solrconfig.xml?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/example/solr/conf/solrconfig.xml (original)
+++ lucene/dev/branches/lucene2858/solr/example/solr/conf/solrconfig.xml Wed Jan 25 21:56:44 2012
@@ -344,11 +344,9 @@
<!-- Enables a transaction log, currently used for real-time get.
"dir" - the target directory for transaction logs, defaults to the
solr data directory. -->
- <!--
- <updateLog class="solr.FSUpdateLog">
+ <updateLog>
<str name="dir">${solr.data.dir:}</str>
</updateLog>
- -->
</updateHandler>
@@ -1072,6 +1070,13 @@
</lst>
</requestHandler>
-->
+
+ <!-- Solr Replication for SolrCloud Recovery
+
+ This is the config need for SolrCloud's recovery replication.
+ -->
+ <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
+
<!-- Search Components
Modified: lucene/dev/branches/lucene2858/solr/example/solr/solr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/example/solr/solr.xml?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/example/solr/solr.xml (original)
+++ lucene/dev/branches/lucene2858/solr/example/solr/solr.xml Wed Jan 25 21:56:44 2012
@@ -28,7 +28,7 @@
adminPath: RequestHandler path to manage cores.
If 'null' (or absent), cores will not be manageable via request handler
-->
- <cores adminPath="/admin/cores" defaultCoreName="collection1">
- <core name="collection1" instanceDir="." shard="shard1"/>
+ <cores adminPath="/admin/cores" defaultCoreName="collection1" host="${host:}" hostPort="${jetty.port:}">
+ <core name="collection1" instanceDir="." />
</cores>
</solr>
Modified: lucene/dev/branches/lucene2858/solr/lib/apache-solr-noggit-pom.xml.template
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/lib/apache-solr-noggit-pom.xml.template?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/lib/apache-solr-noggit-pom.xml.template (original)
+++ lucene/dev/branches/lucene2858/solr/lib/apache-solr-noggit-pom.xml.template Wed Jan 25 21:56:44 2012
@@ -31,6 +31,6 @@
<artifactId>solr-noggit</artifactId>
<name>Solr Specific Noggit</name>
<version>@version@</version>
- <description>Solr Specific Noggit r1209632</description>
+ <description>Solr Specific Noggit r1211150</description>
<packaging>jar</packaging>
</project>
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/SolrServer.java Wed Jan 25 21:56:44 2012
@@ -226,7 +226,22 @@ public abstract class SolrServer impleme
* @throws IOException
*/
public UpdateResponse deleteById(String id) throws SolrServerException, IOException {
- return new UpdateRequest().deleteById( id ).process( this );
+ return deleteById(id, -1);
+ }
+
+ /**
+ * Deletes a single document by unique ID, specifying max time before commit
+ * @param id the ID of the document to delete
+ * @param commitWithinMs max time (in ms) before a commit will happen
+ * @throws SolrServerException
+ * @throws IOException
+ * @since 3.6
+ */
+ public UpdateResponse deleteById(String id, int commitWithinMs) throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteById(id);
+ req.setCommitWithin(commitWithinMs);
+ return req.process(this);
}
/**
@@ -236,7 +251,22 @@ public abstract class SolrServer impleme
* @throws IOException
*/
public UpdateResponse deleteById(List<String> ids) throws SolrServerException, IOException {
- return new UpdateRequest().deleteById( ids ).process( this );
+ return deleteById(ids, -1);
+ }
+
+ /**
+ * Deletes a list of documents by unique ID, specifying max time before commit
+ * @param ids the list of document IDs to delete
+ * @param commitWithinMs max time (in ms) before a commit will happen
+ * @throws SolrServerException
+ * @throws IOException
+ * @since 3.6
+ */
+ public UpdateResponse deleteById(List<String> ids, int commitWithinMs) throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteById(ids);
+ req.setCommitWithin(commitWithinMs);
+ return req.process(this);
}
/**
@@ -246,7 +276,22 @@ public abstract class SolrServer impleme
* @throws IOException
*/
public UpdateResponse deleteByQuery(String query) throws SolrServerException, IOException {
- return new UpdateRequest().deleteByQuery( query ).process( this );
+ return deleteByQuery(query, -1);
+ }
+
+ /**
+ * Deletes documents from the index based on a query, specifying max time before commit
+ * @param query the query expressing what documents to delete
+ * @param commitWithinMs max time (in ms) before a commit will happen
+ * @throws SolrServerException
+ * @throws IOException
+ * @since 3.6
+ */
+ public UpdateResponse deleteByQuery(String query, int commitWithinMs) throws SolrServerException, IOException {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteByQuery(query);
+ req.setCommitWithin(commitWithinMs);
+ return req.process(this);
}
/**
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Jan 25 21:56:44 2012
@@ -28,16 +28,23 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
public class CloudSolrServer extends SolrServer {
@@ -48,12 +55,14 @@ public class CloudSolrServer extends Sol
private String defaultCollection;
private LBHttpSolrServer lbServer;
Random rand = new Random();
-
+ private MultiThreadedHttpConnectionManager connManager;
/**
* @param zkHost The address of the zookeeper quorum containing the cloud state
*/
public CloudSolrServer(String zkHost) throws MalformedURLException {
- this(zkHost, new LBHttpSolrServer());
+ connManager = new MultiThreadedHttpConnectionManager();
+ this.zkHost = zkHost;
+ this.lbServer = new LBHttpSolrServer(new HttpClient(connManager));
}
/**
@@ -88,42 +97,58 @@ public class CloudSolrServer extends Sol
* @throws InterruptedException
*/
public void connect() {
- if (zkStateReader != null) return;
- synchronized(this) {
- if (zkStateReader != null) return;
- try {
- ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
- zk.makeCollectionsNodeWatches();
- zk.makeShardZkNodeWatches(false);
- zk.updateCloudState(true);
- zkStateReader = zk;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (KeeperException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
- } catch (IOException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-
- } catch (TimeoutException e) {
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ if (zkStateReader == null) {
+ synchronized (this) {
+ if (zkStateReader == null) {
+ try {
+ ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout,
+ zkClientTimeout);
+ zk.createClusterStateWatchersAndUpdate();
+ zkStateReader = zk;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (TimeoutException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
}
}
}
-
@Override
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
connect();
+ // TODO: if you can hash here, you could favor the shard leader
+
CloudState cloudState = zkStateReader.getCloudState();
- String collection = request.getParams().get("collection", defaultCollection);
-
- // TODO: allow multiple collections to be specified via comma separated list
+ SolrParams reqParams = request.getParams();
+ if (reqParams == null) {
+ reqParams = new ModifiableSolrParams();
+ }
+ String collection = reqParams.get("collection", defaultCollection);
+
+ // Extract each comma separated collection name and store in a List.
+ List<String> collectionList = StrUtils.splitSmart(collection, ",", true);
+
+ // Retrieve slices from the cloud state and, for each collection specified,
+ // add it to the Map of slices.
+ Map<String,Slice> slices = new HashMap<String,Slice>();
+ for (int i = 0; i < collectionList.size(); i++) {
+ String coll= collectionList.get(i);
+ ClientUtils.appendMap(coll, slices, cloudState.getSlices(coll));
+ }
- Map<String,Slice> slices = cloudState.getSlices(collection);
Set<String> liveNodes = cloudState.getLiveNodes();
// IDEA: have versions on various things... like a global cloudState version
@@ -136,18 +161,21 @@ public class CloudSolrServer extends Sol
List<String> urlList = new ArrayList<String>();
for (Slice slice : slices.values()) {
for (ZkNodeProps nodeProps : slice.getShards().values()) {
- String node = nodeProps.get(ZkStateReader.NODE_NAME);
- if (!liveNodes.contains(node)) continue;
+ ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
+ String node = coreNodeProps.getNodeName();
+ if (!liveNodes.contains(coreNodeProps.getNodeName())
+ || !coreNodeProps.getState().equals(
+ ZkStateReader.ACTIVE)) continue;
if (nodes.put(node, nodeProps) == null) {
- String url = nodeProps.get(ZkStateReader.URL_PROP);
+ String url = coreNodeProps.getCoreUrl();
urlList.add(url);
}
}
}
Collections.shuffle(urlList, rand);
- // System.out.println("########################## MAKING REQUEST TO " + urlList);
- // TODO: set distrib=true if we detected more than one shard?
+ //System.out.println("########################## MAKING REQUEST TO " + urlList);
+
LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
LBHttpSolrServer.Rsp rsp = lbServer.request(req);
return rsp.getResponse();
@@ -161,5 +189,12 @@ public class CloudSolrServer extends Sol
zkStateReader = null;
}
}
+ if (connManager != null) {
+ connManager.shutdown();
+ }
+ }
+
+ public LBHttpSolrServer getLbServer() {
+ return lbServer;
}
}
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CommonsHttpSolrServer.java Wed Jan 25 21:56:44 2012
@@ -139,6 +139,8 @@ public class CommonsHttpSolrServer exten
* with single-part requests.
*/
private boolean useMultiPartPost;
+
+ private boolean shutdownHttpClient = false;
/**
* @param solrServerUrl The URL of the Solr server. For
@@ -204,6 +206,7 @@ public class CommonsHttpSolrServer exten
}
if (client == null) {
+ shutdownHttpClient = true;
_httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()) ;
// prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
@@ -669,4 +672,12 @@ public class CommonsHttpSolrServer exten
req.setCommitWithin(commitWithinMs);
return req.process(this);
}
+
+ public void shutdown() {
+ if (shutdownHttpClient && _httpClient != null
+ && _httpClient.getHttpConnectionManager() instanceof MultiThreadedHttpConnectionManager) {
+ ((MultiThreadedHttpConnectionManager) _httpClient
+ .getHttpConnectionManager()).shutdown();
+ }
+ }
}
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java Wed Jan 25 21:56:44 2012
@@ -28,6 +28,8 @@ import org.apache.solr.common.SolrExcept
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -249,16 +251,26 @@ public class LBHttpSolrServer extends So
rsp.rsp = server.request(req.getRequest());
return rsp; // SUCCESS
} catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
+ // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+ if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+ ex = addZombie(server, e);
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ throw e;
+ }
+
+ // TODO: consider using below above - currently does cause a problem with distrib updates:
+ // seems to match up against a failed forward to leader exception as well...
+ // || e.getMessage().contains("java.net.SocketException")
+ // || e.getMessage().contains("java.net.ConnectException")
+ } catch (SocketException e) {
+ ex = addZombie(server, e);
+ } catch (SocketTimeoutException e) {
+ ex = addZombie(server, e);
} catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- wrapper = new ServerWrapper(server);
- wrapper.lastUsed = System.currentTimeMillis();
- wrapper.standard = false;
- zombieServers.put(wrapper.getKey(), wrapper);
- startAliveCheckExecutor();
+ Throwable rootCause = e.getRootCause();
+ if (rootCause instanceof IOException) {
+ ex = addZombie(server, e);
} else {
throw e;
}
@@ -274,11 +286,23 @@ public class LBHttpSolrServer extends So
zombieServers.remove(wrapper.getKey());
return rsp; // SUCCESS
} catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- zombieServers.remove(wrapper.getKey());
- throw e;
+ // we retry on 404 or 403 or 503 - you can see this on solr shutdown
+ if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
+ ex = e;
+ // already a zombie, no need to re-add
+ } else {
+ // Server is alive but the request was malformed or invalid
+ zombieServers.remove(wrapper.getKey());
+ throw e;
+ }
+
+ } catch (SocketException e) {
+ ex = e;
+ } catch (SocketTimeoutException e) {
+ ex = e;
} catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
+ Throwable rootCause = e.getRootCause();
+ if (rootCause instanceof IOException) {
ex = e;
// already a zombie, no need to re-add
} else {
@@ -293,9 +317,22 @@ public class LBHttpSolrServer extends So
if (ex == null) {
throw new SolrServerException("No live SolrServers available to handle this request");
} else {
- throw new SolrServerException("No live SolrServers available to handle this request", ex);
+ throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
}
+ }
+
+ private Exception addZombie(CommonsHttpSolrServer server,
+ Exception e) {
+
+ ServerWrapper wrapper;
+
+ wrapper = new ServerWrapper(server);
+ wrapper.lastUsed = System.currentTimeMillis();
+ wrapper.standard = false;
+ zombieServers.put(wrapper.getKey(), wrapper);
+ startAliveCheckExecutor();
+ return e;
}
@@ -363,6 +400,12 @@ public class LBHttpSolrServer extends So
public void setSoTimeout(int timeout) {
httpClient.getParams().setSoTimeout(timeout);
}
+
+ public void shutdown() {
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdownNow();
+ }
+ }
/**
* Tries to query a live server. A SolrServerException is thrown if all servers are dead.
Modified: lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=1235969&r1=1235968&r2=1235969&view=diff
==============================================================================
--- lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/dev/branches/lucene2858/solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Wed Jan 25 21:56:44 2012
@@ -20,8 +20,14 @@ package org.apache.solr.client.solrj.imp
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -31,13 +37,10 @@ import org.apache.commons.httpclient.met
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,4 +303,33 @@ public class StreamingUpdateSolrServer e
{
log.error( "error", ex );
}
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ public void shutdownNow() {
+ super.shutdown();
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}