You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [9/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Wed Jan 25 19:49:26 2012
@@ -17,14 +17,6 @@
package org.apache.solr.search;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@@ -33,17 +25,32 @@ import org.apache.lucene.index.*;
import org.apache.lucene.search.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util._TestUtil;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.SolrParams;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import static org.apache.solr.core.SolrCore.verbose;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
public class TestRealTimeGet extends SolrTestCaseJ4 {
+ private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
+
@BeforeClass
public static void beforeClass() throws Exception {
@@ -59,10 +66,26 @@ public class TestRealTimeGet extends Sol
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
- assertJQ(req("qt","/get","id","1")
+ assertJQ(req("qt","/get", "id","1", "fl","id")
,"=={'doc':{'id':'1'}}"
);
- assertJQ(req("qt","/get","ids","1")
+ assertJQ(req("qt","/get","ids","1", "fl","id")
+ ,"=={" +
+ " 'response':{'numFound':1,'start':0,'docs':[" +
+ " {" +
+ " 'id':'1'}]" +
+ " }}}"
+ );
+
+ assertU(commit());
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==1"
+ );
+ assertJQ(req("qt","/get","id","1", "fl","id")
+ ,"=={'doc':{'id':'1'}}"
+ );
+ assertJQ(req("qt","/get","ids","1", "fl","id")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
@@ -70,127 +93,939 @@ public class TestRealTimeGet extends Sol
" }}}"
);
- assertU(commit());
+ assertU(delI("1"));
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==1"
+ );
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+ assertJQ(req("qt","/get","ids","1")
+ ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
+ );
+
+
+ assertU(adoc("id","10"));
+ assertU(adoc("id","11"));
+ assertJQ(req("qt","/get","id","10", "fl","id")
+ ,"=={'doc':{'id':'10'}}"
+ );
+ assertU(delQ("id:10 abcdef"));
+ assertJQ(req("qt","/get","id","10")
+ ,"=={'doc':null}"
+ );
+ assertJQ(req("qt","/get","id","11", "fl","id")
+ ,"=={'doc':{'id':'11'}}"
+ );
+
+
+ }
+
+
+ @Test
+ public void testVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ long version = addAndGetVersion(sdoc("id","1") , null);
+
+ assertJQ(req("q","id:1")
+ ,"/response/numFound==0"
+ );
+
+ // test version is there from rtg
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // test version is there from the index
+ assertU(commit());
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate an update from the leader
+ version += 10;
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test version is there from rtg
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a version less than that does not take affect
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a delete w/ version less than that does not take affect
+ // TODO: also allow passing version on delete instead of on URL?
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // make sure reordering detection also works after a commit
+ assertU(commit());
+
+ // simulate reordering: test that a version less than that does not take affect
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // simulate reordering: test that a delete w/ version less than that does not take affect
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
+
+ // test that version hasn't changed
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':{'id':'1','_version_':" + version + "}}"
+ );
+
+ // now simulate a normal delete from the leader
+ version += 5;
+ updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version)));
+
+ // make sure a reordered add doesn't take affect.
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that it's still deleted
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+
+ // test that we can remember the version of a delete after a commit
+ assertU(commit());
+
+ // make sure a reordered add doesn't take affect.
+ updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that it's still deleted
+ assertJQ(req("qt","/get","id","1")
+ ,"=={'doc':null}"
+ );
+
+ version = addAndGetVersion(sdoc("id","2"), null);
+ long version2 = deleteByQueryAndGetVersion("id:2", null);
+ assertTrue(Math.abs(version2) > version );
+
+ }
+
+
+
+ /***
+ @Test
+ public void testGetRealtime() throws Exception {
+ SolrQueryRequest sr1 = req("q","foo");
+ IndexReader r1 = sr1.getCore().getRealtimeReader();
+
+ assertU(adoc("id","1"));
+
+ IndexReader r2 = sr1.getCore().getRealtimeReader();
+ assertNotSame(r1, r2);
+ int refcount = r2.getRefCount();
+
+ // make sure a new reader wasn't opened
+ IndexReader r3 = sr1.getCore().getRealtimeReader();
+ assertSame(r2, r3);
+ assertEquals(refcount+1, r3.getRefCount());
+
+ assertU(commit());
+
+ // this is not critical, but currently a commit does not refresh the reader
+ // if nothing has changed
+ IndexReader r4 = sr1.getCore().getRealtimeReader();
+ assertEquals(refcount+2, r4.getRefCount());
+
+
+ r1.decRef();
+ r2.decRef();
+ r3.decRef();
+ r4.decRef();
+ sr1.close();
+ }
+ ***/
+
+
+ final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+ Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
+ long snapshotCount;
+ long committedModelClock;
+ volatile int lastId;
+ final String field = "val_l";
+ Object[] syncArr;
+
+ private void initModel(int ndocs) {
+ snapshotCount = 0;
+ committedModelClock = 0;
+ lastId = 0;
+
+ syncArr = new Object[ndocs];
+
+ for (int i=0; i<ndocs; i++) {
+ model.put(i, new DocInfo(0, -1L));
+ syncArr[i] = new Object();
+ }
+ committedModel.putAll(model);
+ }
+
+
+ static class DocInfo {
+ long version;
+ long val;
+
+ public DocInfo(long version, long val) {
+ this.version = version;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "{version="+version+",val="+val+"\"";
+ }
+ }
+
+ @Test
+ public void testStressGetRealtime() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 1+random.nextInt(5);
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 60;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+
+ verbose("commitPercent=", commitPercent);
+ verbose("softCommitPercent=",softCommitPercent);
+ verbose("deletePercent=",deletePercent);
+ verbose("deleteByQueryPercent=", deleteByQueryPercent);
+ verbose("ndocs=", ndocs);
+ verbose("nWriteThreads=", nWriteThreads);
+ verbose("nReadThreads=", nReadThreads);
+ verbose("percentRealtimeQuery=", percentRealtimeQuery);
+ verbose("maxConcurrentCommits=", maxConcurrentCommits);
+ verbose("operations=", operations);
+
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ verbose("took snapshot version=",version);
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ if (VERBOSE) {
+ verbose("deleting id",id,"val=",nextVal);
+ }
+
+ // assertU("<delete><id>" + id + "</id></delete>");
+ Long version = deleteAndGetVersion(Integer.toString(id), null);
+
+ model.put(id, new DocInfo(version, -nextVal));
+ if (VERBOSE) {
+ verbose("deleting id", id, "val=",nextVal,"DONE");
+ }
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ if (VERBOSE) {
+ verbose("deleteByQuery id ",id, "val=",nextVal);
+ }
+
+ assertU("<delete><query>id:" + id + "</query></delete>");
+ model.put(id, new DocInfo(-1L, -nextVal));
+ if (VERBOSE) {
+ verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
+ }
+ } else {
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal);
+ }
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+ model.put(id, new DocInfo(version, nextVal));
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"DONE");
+ }
+
+ }
+ } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVal < Math.abs(info.val)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+
+ // This version doesn't synchronize on id to tell what update won, but instead uses versions
+ @Test
+ public void testStressGetRealtimeVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 1 + random.nextInt(5);
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ //
+ // NOTE: versioning means we can now remove the sync and tell what update "won"
+ // synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ verbose("deleting id",id,"val=",nextVal);
+
+ Long version = deleteAndGetVersion(Integer.toString(id), null);
+ assertTrue(version < 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleting id", id, "val=",nextVal,"DONE");
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ verbose("deleteByQyery id",id,"val=",nextVal);
+
+ Long version = deleteByQueryAndGetVersion("id:"+Integer.toString(id), null);
+ assertTrue(version < 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleteByQyery id", id, "val=",nextVal,"DONE");
+ } else {
+ verbose("adding id", id, "val=", nextVal);
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)), null);
+ assertTrue(version > 0);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"DONE");
+ }
+
+ }
+ // } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+ // This version simulates updates coming from the leader and sometimes being reordered
+ @Test
+ public void testStressReorderVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 0; // delete-by-query can't be reordered on replicas
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+
+ final AtomicLong testVersion = new AtomicLong(0);
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ DocInfo info = model.get(id);
- assertJQ(req("q","id:1")
- ,"/response/numFound==1"
- );
- assertJQ(req("qt","/get","id","1")
- ,"=={'doc':{'id':'1'}}"
- );
- assertJQ(req("qt","/get","ids","1")
- ,"=={" +
- " 'response':{'numFound':1,'start':0,'docs':[" +
- " {" +
- " 'id':'1'}]" +
- " }}}"
- );
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
- assertU(delI("1"));
+ // the version we set on the update should determine who wins
+ // These versions are not derived from the actual leader update handler hand hence this
+ // test may need to change depending on how we handle version numbers.
+ long version = testVersion.incrementAndGet();
- assertJQ(req("q","id:1")
- ,"/response/numFound==1"
- );
- assertJQ(req("qt","/get","id","1")
- ,"=={'doc':null}"
- );
- assertJQ(req("qt","/get","ids","1")
- ,"=={'response':{'numFound':0,'start':0,'docs':[]}}"
- );
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
- }
+ if (oper < commitPercent + deletePercent) {
+ verbose("deleting id",id,"val=",nextVal,"version",version);
+ Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
- /***
- @Test
- public void testGetRealtime() throws Exception {
- SolrQueryRequest sr1 = req("q","foo");
- IndexReader r1 = sr1.getCore().getRealtimeReader();
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
+ }
- assertU(adoc("id","1"));
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
- IndexReader r2 = sr1.getCore().getRealtimeReader();
- assertNotSame(r1, r2);
- int refcount = r2.getRefCount();
+ verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- // make sure a new reader wasn't opened
- IndexReader r3 = sr1.getCore().getRealtimeReader();
- assertSame(r2, r3);
- assertEquals(refcount+1, r3.getRefCount());
+ } else {
+ verbose("adding id", id, "val=", nextVal,"version",version);
- assertU(commit());
+ Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ if (returnedVersion != null) {
+ assertEquals(version, returnedVersion.longValue());
+ }
- // this is not critical, but currently a commit does not refresh the reader
- // if nothing has changed
- IndexReader r4 = sr1.getCore().getRealtimeReader();
- assertEquals(refcount+2, r4.getRefCount());
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+ }
- r1.decRef();
- r2.decRef();
- r3.decRef();
- r4.decRef();
- sr1.close();
- }
- ***/
+ }
+ // } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+ threads.add(thread);
+ }
- final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
- Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
- long snapshotCount;
- long committedModelClock;
- volatile int lastId;
- final String field = "val_l";
- Object[] syncArr;
- private void initModel(int ndocs) {
- snapshotCount = 0;
- committedModelClock = 0;
- lastId = 0;
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
- syncArr = new Object[ndocs];
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
- for (int i=0; i<ndocs; i++) {
- model.put(i, -1L);
- syncArr[i] = new Object();
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
}
- committedModel.putAll(model);
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
}
+ // This points to the live model when state is ACTIVE, but a snapshot of the
+ // past when recovering.
+ volatile ConcurrentHashMap<Integer,DocInfo> visibleModel;
+
+ // This version simulates updates coming from the leader and sometimes being reordered
+ // and tests the ability to buffer updates and apply them later
@Test
- public void testStressGetRealtime() throws Exception {
+ public void testStressRecovery() throws Exception {
clearIndex();
assertU(commit());
- // req().getCore().getUpdateHandler().getIndexWriterProvider().getIndexWriter(req().getCore()).setInfoStream(System.out);
-
- final int commitPercent = 5 + random.nextInt(20);
+ final int commitPercent = 5 + random.nextInt(10);
final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random.nextInt(25);
final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
- int nWriteThreads = 5 + random.nextInt(25);
+ int nWriteThreads = 2 + random.nextInt(10); // fewer write threads to give recovery thread more of a chance
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
- final int percentRealtimeQuery = 60;
- // final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
- int nReadThreads = 5 + random.nextInt(25);
-
-
- verbose("commitPercent=", commitPercent);
- verbose("softCommitPercent=",softCommitPercent);
- verbose("deletePercent=",deletePercent);
- verbose("deleteByQueryPercent=", deleteByQueryPercent);
- verbose("ndocs=", ndocs);
- verbose("nWriteThreads=", nWriteThreads);
- verbose("nReadThreads=", nReadThreads);
- verbose("percentRealtimeQuery=", percentRealtimeQuery);
- verbose("maxConcurrentCommits=", maxConcurrentCommits);
- verbose("operations=", operations);
-
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(atLeast(75)); // number of recovery loops to perform
+ int nReadThreads = 2 + random.nextInt(10); // fewer read threads to give writers more of a chance
initModel(ndocs);
@@ -198,39 +1033,63 @@ public class TestRealTimeGet extends Sol
List<Thread> threads = new ArrayList<Thread>();
+
+ final AtomicLong testVersion = new AtomicLong(0);
+
+
+ final UpdateHandler uHandler = h.getCore().getUpdateHandler();
+ final UpdateLog uLog = uHandler.getUpdateLog();
+ final VersionInfo vInfo = uLog.getVersionInfo();
+ final Object stateChangeLock = new Object();
+ this.visibleModel = model;
+ final Semaphore[] writePermissions = new Semaphore[nWriteThreads];
+ for (int i=0; i<nWriteThreads; i++) writePermissions[i] = new Semaphore(Integer.MAX_VALUE, false);
+
+ final Semaphore readPermission = new Semaphore(Integer.MAX_VALUE, false);
+
for (int i=0; i<nWriteThreads; i++) {
+ final int threadNum = i;
+
Thread thread = new Thread("WRITER"+i) {
Random rand = new Random(random.nextInt());
+ Semaphore writePermission = writePermissions[threadNum];
@Override
public void run() {
try {
while (operations.get() > 0) {
- int oper = rand.nextInt(100);
+ writePermission.acquire();
+
+ int oper = rand.nextInt(10);
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
+ Map<Integer,DocInfo> newCommittedModel;
long version;
synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
- verbose("took snapshot version=",version);
}
- if (rand.nextInt(100) < softCommitPercent) {
- verbose("softCommit start");
- assertU(h.commit("softCommit","true"));
- verbose("softCommit end");
- } else {
- verbose("hardCommit start");
- assertU(commit());
- verbose("hardCommit end");
+ synchronized (stateChangeLock) {
+ // These commits won't take affect if we are in recovery mode,
+ // so change the version to -1 so we won't update our model.
+ if (uLog.getState() != UpdateLog.State.ACTIVE) version = -1;
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
}
synchronized(TestRealTimeGet.this) {
// install this model snapshot only if it's newer than the current one
+ // install this model only if we are not in recovery mode.
if (version >= committedModelClock) {
if (VERBOSE) {
verbose("installing new committedModel version="+committedModelClock);
@@ -245,8 +1104,13 @@ public class TestRealTimeGet extends Sol
}
- int id = rand.nextInt(ndocs);
- Object sync = syncArr[id];
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
@@ -255,46 +1119,63 @@ public class TestRealTimeGet extends Sol
lastId = id;
}
- // We can't concurrently update the same document and retain our invariants of increasing values
- // since we can't guarantee what order the updates will be executed.
- synchronized (sync) {
- Long val = model.get(id);
- long nextVal = Math.abs(val)+1;
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ // the version we set on the update should determine who wins
+ // These versions are not derived from the actual leader update handler hand hence this
+ // test may need to change depending on how we handle version numbers.
+ long version = testVersion.incrementAndGet();
+
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
if (oper < commitPercent + deletePercent) {
- if (VERBOSE) {
- verbose("deleting id",id,"val=",nextVal);
+ verbose("deleting id",id,"val=",nextVal,"version",version);
+
+ Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
}
- assertU("<delete><id>" + id + "</id></delete>");
- model.put(id, -nextVal);
- if (VERBOSE) {
- verbose("deleting id", id, "val=",nextVal,"DONE");
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
}
+
+ verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- if (VERBOSE) {
- verbose("deleteByQuery id ",id, "val=",nextVal);
- }
- assertU("<delete><query>id:" + id + "</query></delete>");
- model.put(id, -nextVal);
- if (VERBOSE) {
- verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
- }
} else {
- if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal);
+ verbose("adding id", id, "val=", nextVal,"version",version);
+
+ Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ if (returnedVersion != null) {
+ assertEquals(version, returnedVersion.longValue());
}
- assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- model.put(id, nextVal);
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
if (VERBOSE) {
- verbose("adding id", id, "val=", nextVal,"DONE");
+ verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
}
}
- }
+ // } // end sync
if (!before) {
lastId = id;
@@ -319,7 +1200,11 @@ public class TestRealTimeGet extends Sol
@Override
public void run() {
try {
- while (operations.decrementAndGet() >= 0) {
+ while (operations.get() > 0) {
+ // throttle reads (don't completely stop)
+ readPermission.tryAcquire(10, TimeUnit.MILLISECONDS);
+
+
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
@@ -327,17 +1212,18 @@ public class TestRealTimeGet extends Sol
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- long val;
+ DocInfo info;
if (realTime) {
- val = model.get(id);
+ info = visibleModel.get(id);
} else {
synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
+ info = committedModel.get(id);
}
}
- if (VERBOSE) {
+
+ if (VERBOSE) {
verbose("querying id", id);
}
SolrQueryRequest sreq;
@@ -355,9 +1241,11 @@ public class TestRealTimeGet extends Sol
} else {
assertEquals(1, doclist.size());
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- if (foundVal < Math.abs(val)) {
- verbose("ERROR, id", id, "foundVal=",foundVal,"model val=",val,"realTime=",realTime);
- assertTrue(foundVal >= Math.abs(val));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
}
}
}
@@ -378,6 +1266,68 @@ public class TestRealTimeGet extends Sol
thread.start();
}
+ int bufferedAddsApplied = 0;
+ do {
+ assertTrue(uLog.getState() == UpdateLog.State.ACTIVE);
+
+ // before we start buffering updates, we want to point
+ // visibleModel away from the live model.
+
+ visibleModel = new ConcurrentHashMap<Integer, DocInfo>(model);
+
+ synchronized (stateChangeLock) {
+ uLog.bufferUpdates();
+ }
+
+ assertTrue(uLog.getState() == UpdateLog.State.BUFFERING);
+
+ // sometimes wait for a second to allow time for writers to write something
+ if (random.nextBoolean()) Thread.sleep(random.nextInt(10)+1);
+
+ Future<UpdateLog.RecoveryInfo> recoveryInfoF = uLog.applyBufferedUpdates();
+ if (recoveryInfoF != null) {
+ UpdateLog.RecoveryInfo recInfo = null;
+
+ int writeThreadNumber = 0;
+ while (recInfo == null) {
+ try {
+ // wait a short period of time for recovery to complete (and to give a chance for more writers to concurrently add docs)
+ recInfo = recoveryInfoF.get(random.nextInt(100/nWriteThreads), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // idle one more write thread
+ verbose("Operation",operations.get(),"Draining permits for write thread",writeThreadNumber);
+ writePermissions[writeThreadNumber++].drainPermits();
+ if (writeThreadNumber >= nWriteThreads) {
+ // if we hit the end, back up and give a few write permits
+ writeThreadNumber--;
+ writePermissions[writeThreadNumber].release(random.nextInt(2) + 1);
+ }
+
+ // throttle readers so they don't steal too much CPU from the recovery thread
+ readPermission.drainPermits();
+ }
+ }
+
+ bufferedAddsApplied += recInfo.adds;
+ }
+
+ // put all writers back at full blast
+ for (Semaphore writePerm : writePermissions) {
+ // I don't think semaphores check for overflow, so we need to check mow many remain
+ int neededPermits = Integer.MAX_VALUE - writePerm.availablePermits();
+ if (neededPermits > 0) writePerm.release( neededPermits );
+ }
+
+ // put back readers at full blast and point back to live model
+ visibleModel = model;
+ int neededPermits = Integer.MAX_VALUE - readPermission.availablePermits();
+ if (neededPermits > 0) readPermission.release( neededPermits );
+
+ verbose("ROUND=",operations.get());
+ } while (operations.decrementAndGet() > 0);
+
+ verbose("bufferedAddsApplied=",bufferedAddsApplied);
+
for (Thread thread : threads) {
thread.join();
}
@@ -387,6 +1337,10 @@ public class TestRealTimeGet extends Sol
+
+
+
+
// The purpose of this test is to roughly model how solr uses lucene
IndexReader reader;
@Test
@@ -394,13 +1348,13 @@ public class TestRealTimeGet extends Sol
final int commitPercent = 5 + random.nextInt(20);
final int softCommitPercent = 30+random.nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random.nextInt(25);
- final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
+ final int deleteByQueryPercent = 1+random.nextInt(5);
final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
int nWriteThreads = 5 + random.nextInt(25);
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
- final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total - crank up if
+ final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total - crank up if
int nReadThreads = 5 + random.nextInt(25);
final boolean tombstones = random.nextBoolean();
final boolean syncCommits = random.nextBoolean();
@@ -468,7 +1422,7 @@ public class TestRealTimeGet extends Sol
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
+ Map<Integer,DocInfo> newCommittedModel;
long version;
IndexReader oldReader;
@@ -485,7 +1439,7 @@ public class TestRealTimeGet extends Sol
if (reopenLock != null) reopenLock.lock();
synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
oldReader = reader;
oldReader.incRef(); // increment the reference since we will use this for reopening
@@ -562,7 +1516,8 @@ public class TestRealTimeGet extends Sol
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
synchronized (sync) {
- Long val = model.get(id);
+ DocInfo info = model.get(id);
+ long val = info.val;
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
@@ -577,7 +1532,7 @@ public class TestRealTimeGet extends Sol
verbose("deleting id",id,"val=",nextVal);
writer.deleteDocuments(new Term("id",Integer.toString(id)));
- model.put(id, -nextVal);
+ model.put(id, new DocInfo(0,-nextVal));
verbose("deleting id",id,"val=",nextVal,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
@@ -594,7 +1549,7 @@ public class TestRealTimeGet extends Sol
verbose("deleteByQuery",id,"val=",nextVal);
writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
- model.put(id, -nextVal);
+ model.put(id, new DocInfo(0,-nextVal));
verbose("deleteByQuery",id,"val=",nextVal,"DONE");
} else {
// model.put(id, nextVal); // uncomment this and this test should fail.
@@ -612,7 +1567,7 @@ public class TestRealTimeGet extends Sol
verbose("deleting tombstone for id",id,"DONE");
}
- model.put(id, nextVal);
+ model.put(id, new DocInfo(0,nextVal));
verbose("adding id",id,"val=",nextVal,"DONE");
}
}
@@ -645,12 +1600,11 @@ public class TestRealTimeGet extends Sol
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
- long val;
-
+ DocInfo info;
synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
+ info = committedModel.get(id);
}
-
+ long val = info.val;
IndexReader r;
synchronized(TestRealTimeGet.this) {
@@ -728,7 +1682,7 @@ public class TestRealTimeGet extends Sol
if (!termsEnum.seekExact(termBytes, false)) {
return -1;
}
- DocsEnum docs = _TestUtil.docs(random, termsEnum, MultiFields.getLiveDocs(r), null, false);
+ DocsEnum docs = termsEnum.docs(MultiFields.getLiveDocs(r), null, false);
int id = docs.nextDoc();
if (id != DocIdSetIterator.NO_MORE_DOCS) {
int next = docs.nextDoc();
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+
+import org.apache.noggit.JSONUtil;
+import org.apache.noggit.ObjectBuilder;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.DirectUpdateHandler2;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.*;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
+
+public class TestRecovery extends SolrTestCaseJ4 {
+ private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
+ private static int timeout=60; // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing.
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig-tlog.xml","schema12.xml");
+ }
+
+ @Test
+ public void testLogReplay() throws Exception {
+ try {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ clearIndex();
+ assertU(commit());
+
+ Deque<Long> versions = new ArrayDeque<Long>();
+ versions.addFirst(addAndGetVersion(sdoc("id", "1"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "11"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "12"), null));
+ versions.addFirst(deleteByQueryAndGetVersion("id:11", null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "13"), null));
+
+ assertJQ(req("q","*:*"),"/response/numFound==0");
+
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
+
+ h.close();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ // verify that previous close didn't do a commit
+ // recovery should be blocked by our hook
+ assertJQ(req("q","*:*") ,"/response/numFound==0");
+
+ // make sure we can still access versions after a restart
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
+
+ // unblock recovery
+ logReplay.release(1000);
+
+ // make sure we can still access versions during recovery
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
+
+ // wait until recovery has finished
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+
+ assertJQ(req("q","*:*") ,"/response/numFound==3");
+
+ // make sure we can still access versions after recovery
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
+
+ assertU(adoc("id","2"));
+ assertU(adoc("id","3"));
+ assertU(delI("2"));
+ assertU(adoc("id","4"));
+
+ assertJQ(req("q","*:*") ,"/response/numFound==3");
+
+ h.close();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ // wait until recovery has finished
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertJQ(req("q","*:*") ,"/response/numFound==5");
+ assertJQ(req("q","id:2") ,"/response/numFound==0");
+
+ // no updates, so insure that recovery does not run
+ h.close();
+ int permits = logReplay.availablePermits();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ assertJQ(req("q","*:*") ,"/response/numFound==5");
+ Thread.sleep(100);
+ assertEquals(permits, logReplay.availablePermits()); // no updates, so insure that recovery didn't run
+
+ assertEquals(UpdateLog.State.ACTIVE, h.getCore().getUpdateHandler().getUpdateLog().getState());
+
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+
+ }
+
+ @Test
+ public void testBuffering() throws Exception {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ SolrQueryRequest req = req();
+ UpdateHandler uhandler = req.getCore().getUpdateHandler();
+ UpdateLog ulog = uhandler.getUpdateLog();
+
+ try {
+ clearIndex();
+ assertU(commit());
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+ Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture == null);
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+ // simulate updates from a leader
+ updateJ(jsonAdd(sdoc("id","1", "_version_","1010")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","11", "_version_","1015")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonDelQ("id:1 id:11 id:2 id:3"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-1017"));
+ updateJ(jsonAdd(sdoc("id","2", "_version_","1020")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","3", "_version_","1030")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ deleteAndGetVersion("1", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2010"));
+
+ assertJQ(req("qt","/get", "getVersions","6")
+ ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
+ );
+
+ assertU(commit());
+
+ assertJQ(req("qt","/get", "getVersions","6")
+ ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
+ );
+
+ // updates should be buffered, so we should not see any results yet.
+ assertJQ(req("q", "*:*")
+ , "/response/numFound==0"
+ );
+
+ // real-time get should also not show anything (this could change in the future,
+ // but it's currently used for validating version numbers too, so it would
+ // be bad for updates to be visible if we're just buffering.
+ assertJQ(req("qt","/get", "id","3")
+ ,"=={'doc':null}"
+ );
+
+
+ rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture != null);
+
+ assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState());
+
+ logReplay.release(1000);
+
+ UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+
+ assertJQ(req("qt","/get", "getVersions","6")
+ ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
+ );
+
+
+ assertJQ(req("q", "*:*")
+ , "/response/numFound==2"
+ );
+
+ // move back to recovering
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+ Long ver = getVer(req("qt","/get", "id","3"));
+ assertEquals(1030L, ver.longValue());
+
+ // add a reordered doc that shouldn't overwrite one in the index
+ updateJ(jsonAdd(sdoc("id","3", "_version_","3")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // reorder two buffered updates
+ updateJ(jsonAdd(sdoc("id","4", "_version_","1040")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ deleteAndGetVersion("4", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-940")); // this update should not take affect
+ updateJ(jsonAdd(sdoc("id","6", "_version_","1060")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","5", "_version_","1050")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","8", "_version_","1080")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // test that delete by query is at least buffered along with everything else so it will delete the
+ // currently buffered id:8 (even if it doesn't currently support versioning)
+ updateJ("{\"delete\": { \"query\":\"id:2 OR id:8\" }}", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-3000"));
+
+ assertJQ(req("qt","/get", "getVersions","13")
+ ,"=={'versions':[-3000,1080,1050,1060,-940,1040,3,-2010,1030,1020,-1017,1015,1010]}" // the "3" appears because versions aren't checked while buffering
+ );
+
+ logReplay.drainPermits();
+ rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture != null);
+ assertEquals(UpdateLog.State.APPLYING_BUFFERED, ulog.getState());
+
+ // apply a single update
+ logReplay.release(1);
+
+ // now add another update
+ updateJ(jsonAdd(sdoc("id","7", "_version_","1070")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // a reordered update that should be dropped
+ deleteAndGetVersion("5", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-950"));
+
+ deleteAndGetVersion("6", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2060"));
+
+ logReplay.release(1000);
+ UpdateLog.RecoveryInfo recInfo = rinfoFuture.get();
+
+ assertJQ(req("q", "*:*", "sort","id asc", "fl","id,_version_")
+ , "/response/docs==["
+ + "{'id':'3','_version_':1030}"
+ + ",{'id':'4','_version_':1040}"
+ + ",{'id':'5','_version_':1050}"
+ + ",{'id':'7','_version_':1070}"
+ +"]"
+ );
+
+ assertEquals(1, recInfo.deleteByQuery);
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+
+ req().close();
+ }
+
+ }
+
+
+ @Test
+ public void testDropBuffered() throws Exception {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ SolrQueryRequest req = req();
+ UpdateHandler uhandler = req.getCore().getUpdateHandler();
+ UpdateLog ulog = uhandler.getUpdateLog();
+
+ try {
+ clearIndex();
+ assertU(commit());
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+ Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
+ assertTrue(rinfoFuture == null);
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
+
+ ulog.bufferUpdates();
+ assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
+
+ // simulate updates from a leader
+ updateJ(jsonAdd(sdoc("id","1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ assertTrue(ulog.dropBufferedUpdates());
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id", "4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id", "5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ logReplay.release(1000);
+ rinfoFuture = ulog.applyBufferedUpdates();
+ UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
+ assertEquals(2, rinfo.adds);
+
+ assertJQ(req("qt","/get", "getVersions","2")
+ ,"=={'versions':[105,104]}"
+ );
+
+ // this time add some docs first before buffering starts (so tlog won't be at pos 0)
+ updateJ(jsonAdd(sdoc("id","100", "_version_","200")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","101", "_version_","201")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id","103", "_version_","203")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","104", "_version_","204")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ assertTrue(ulog.dropBufferedUpdates());
+ ulog.bufferUpdates();
+ updateJ(jsonAdd(sdoc("id","105", "_version_","205")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","106", "_version_","206")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ rinfoFuture = ulog.applyBufferedUpdates();
+ rinfo = rinfoFuture.get();
+ assertEquals(2, rinfo.adds);
+
+ assertJQ(req("q", "*:*", "sort","_version_ asc", "fl","id,_version_")
+ , "/response/docs==["
+ + "{'id':'4','_version_':104}"
+ + ",{'id':'5','_version_':105}"
+ + ",{'id':'100','_version_':200}"
+ + ",{'id':'101','_version_':201}"
+ + ",{'id':'105','_version_':205}"
+ + ",{'id':'106','_version_':206}"
+ +"]"
+ );
+
+ assertJQ(req("qt","/get", "getVersions","6")
+ ,"=={'versions':[206,205,201,200,105,104]}"
+ );
+
+
+ assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+
+ req().close();
+ }
+
+ }
+
+
+ // make sure that on a restart, versions don't start too low
+ @Test
+ public void testVersionsOnRestart() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ assertU(adoc("id","1", "val_i","1"));
+ assertU(adoc("id","2", "val_i","1"));
+ assertU(commit());
+ long v1 = getVer(req("q","id:1"));
+ long v1a = getVer(req("q","id:2"));
+
+ h.close();
+ createCore();
+
+ assertU(adoc("id","1", "val_i","2"));
+ assertU(commit());
+ long v2 = getVer(req("q","id:1"));
+
+ assert(v2 > v1);
+
+ assertJQ(req("qt","/get", "getVersions","2")
+ ,"/versions==[" + v2 + "," + v1a + "]"
+ );
+
+ }
+
+
+ private void addDocs(int nDocs, int start, LinkedList<Long> versions) throws Exception {
+ for (int i=0; i<nDocs; i++) {
+ versions.addFirst( addAndGetVersion( sdoc("id",Integer.toString(start + nDocs)) , null) );
+ }
+ }
+
+ @Test
+ public void testRemoveOldLogs() throws Exception {
+ try {
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+
+ clearIndex();
+ assertU(commit());
+
+ File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();
+
+ h.close();
+
+ String[] files = UpdateLog.getLogList(logDir);
+ for (String file : files) {
+ new File(logDir, file).delete();
+ }
+
+ assertEquals(0, UpdateLog.getLogList(logDir).length);
+
+ createCore();
+
+ int start = 0;
+ int maxReq = 50;
+
+ LinkedList<Long> versions = new LinkedList<Long>();
+ addDocs(10, start, versions); start+=10;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ addDocs(10, start, versions); start+=10;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ assertEquals(2, UpdateLog.getLogList(logDir).length);
+
+ addDocs(105, start, versions); start+=105;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ // previous two logs should be gone now
+ assertEquals(1, UpdateLog.getLogList(logDir).length);
+
+ addDocs(1, start, versions); start+=1;
+ h.close();
+ createCore(); // trigger recovery, make sure that tlog reference handling is correct
+
+ // test we can get versions while replay is happening
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ logReplay.release(1000);
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ addDocs(105, start, versions); start+=105;
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+ assertU(commit());
+ assertJQ(req("qt","/get", "getVersions",""+maxReq), "/versions==" + versions.subList(0,Math.min(maxReq,start)));
+
+ // previous logs should be gone now
+ assertEquals(1, UpdateLog.getLogList(logDir).length);
+
+ //
+ // test that a corrupt tlog file doesn't stop us from coming up, or seeing versions before that tlog file.
+ //
+ addDocs(1, start, new LinkedList<Long>()); // don't add this to the versions list because we are going to lose it...
+ h.close();
+ files = UpdateLog.getLogList(logDir);
+ Arrays.sort(files);
+ RandomAccessFile raf = new RandomAccessFile(new File(logDir, files[files.length-1]), "rw");
+ raf.writeChars("This is a trashed log file that really shouldn't work at all, but we'll see...");
+ raf.close();
+
+ ignoreException("Failure to open existing");
+ createCore();
+ // we should still be able to get the list of versions (not including the trashed log file)
+ assertJQ(req("qt", "/get", "getVersions", "" + maxReq), "/versions==" + versions.subList(0, Math.min(maxReq, start)));
+ resetExceptionIgnores();
+
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+ }
+
+ //
+ // test that a partially written last tlog entry (that will cause problems for both reverse reading and for
+ // log replay) doesn't stop us from coming up, and from recovering the documents that were not cut off.
+ //
+ @Test
+ public void testTruncatedLog() throws Exception {
+ try {
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = new Runnable() {
+ @Override
+ public void run() {
+ logReplayFinish.release();
+ }
+ };
+
+ File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();
+
+ clearIndex();
+ assertU(commit());
+
+ assertU(adoc("id","1"));
+ assertU(adoc("id","2"));
+ assertU(adoc("id","3"));
+
+ h.close();
+ String[] files = UpdateLog.getLogList(logDir);
+ Arrays.sort(files);
+ RandomAccessFile raf = new RandomAccessFile(new File(logDir, files[files.length-1]), "rw");
+ raf.seek(raf.length()); // seek to end
+ raf.writeLong(0xffffffffffffffffL);
+ raf.writeChars("This should be appended to a good log file, representing a bad partially written record.");
+ raf.close();
+
+ logReplay.release(1000);
+ logReplayFinish.drainPermits();
+ ignoreException("OutOfBoundsException"); // this is what the corrupted log currently produces... subject to change.
+ createCore();
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ resetExceptionIgnores();
+ assertJQ(req("q","*:*") ,"/response/numFound==3");
+
+ //
+ // Now test that the bad log file doesn't mess up retrieving latest versions
+ //
+
+ updateJ(jsonAdd(sdoc("id","4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+ updateJ(jsonAdd(sdoc("id","6", "_version_","106")), params(SEEN_LEADER,SEEN_LEADER_VAL));
+
+ // This currently skips the bad log file and also returns the version of the clearIndex (del *:*)
+ // assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]");
+ assertJQ(req("qt","/get", "getVersions","3"), "/versions==[106,105,104]");
+
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+ }
+
+
+
+ private static Long getVer(SolrQueryRequest req) throws Exception {
+ String response = JQ(req);
+ Map rsp = (Map) ObjectBuilder.fromJSON(response);
+ Map doc = null;
+ if (rsp.containsKey("doc")) {
+ doc = (Map)rsp.get("doc");
+ } else if (rsp.containsKey("docs")) {
+ List lst = (List)rsp.get("docs");
+ if (lst.size() > 0) {
+ doc = (Map)lst.get(0);
+ }
+ } else if (rsp.containsKey("response")) {
+ Map responseMap = (Map)rsp.get("response");
+ List lst = (List)responseMap.get("docs");
+ if (lst.size() > 0) {
+ doc = (Map)lst.get(0);
+ }
+ }
+
+ if (doc == null) return null;
+
+ return (Long)doc.get("_version_");
+ }
+}
+