You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/10/10 17:12:32 UTC
svn commit: r1181011 - in /lucene/dev/branches/solrcloud/solr:
core/src/test-files/solr/conf/schema12.xml
core/src/test/org/apache/solr/search/TestRealTimeGet.java
test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
Author: yonik
Date: Mon Oct 10 15:12:31 2011
New Revision: 1181011
URL: http://svn.apache.org/viewvc?rev=1181011&view=rev
Log:
SOLR-2816: test versions via realtime get
Modified:
lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml Mon Oct 10 15:12:31 2011
@@ -530,6 +530,11 @@
<field name="uniq3" type="string" indexed="true" stored="true"/>
<field name="nouniq" type="string" indexed="true" stored="true" multiValued="true"/>
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+
+
+
<dynamicField name="*_coordinate" type="tdouble" indexed="true" stored="false"/>
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Mon Oct 10 15:12:31 2011
@@ -20,6 +20,7 @@ package org.apache.solr.search;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -48,10 +49,10 @@ public class TestRealTimeGet extends Sol
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
- assertJQ(req("qt","/get","id","1")
+ assertJQ(req("qt","/get", "id","1", "fl","id")
,"=={'doc':{'id':'1'}}"
);
- assertJQ(req("qt","/get","ids","1")
+ assertJQ(req("qt","/get","ids","1", "fl","id")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
@@ -64,10 +65,10 @@ public class TestRealTimeGet extends Sol
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
- assertJQ(req("qt","/get","id","1")
+ assertJQ(req("qt","/get","id","1", "fl","id")
,"=={'doc':{'id':'1'}}"
);
- assertJQ(req("qt","/get","ids","1")
+ assertJQ(req("qt","/get","ids","1", "fl","id")
,"=={" +
" 'response':{'numFound':1,'start':0,'docs':[" +
" {" +
@@ -136,9 +137,22 @@ public class TestRealTimeGet extends Sol
System.out.println(sb.toString());
}
+ static class DocInfo {
+ long version;
+ long val;
+
+ public DocInfo(long version, long val) {
+ this.version = version;
+ this.val = val;
+ }
+
+ public String toString() {
+ return "{version="+version+",val="+val+"\"";
+ }
+ }
- final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
- Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
+ final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+ Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
long snapshotCount;
long committedModelClock;
volatile int lastId;
@@ -153,7 +167,7 @@ public class TestRealTimeGet extends Sol
syncArr = new Object[ndocs];
for (int i=0; i<ndocs; i++) {
- model.put(i, -1L);
+ model.put(i, new DocInfo(0, -1L));
syncArr[i] = new Object();
}
committedModel.putAll(model);
@@ -199,11 +213,11 @@ public class TestRealTimeGet extends Sol
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
+ Map<Integer,DocInfo> newCommittedModel;
long version;
synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
}
@@ -245,8 +259,11 @@ public class TestRealTimeGet extends Sol
// We can't concurrently update the same document and retain our invariants of increasing values
// since we can't guarantee what order the updates will be executed.
- synchronized (sync) {
- Long val = model.get(id);
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
long nextVal = Math.abs(val)+1;
if (oper < commitPercent + deletePercent) {
@@ -254,8 +271,11 @@ public class TestRealTimeGet extends Sol
verbose("deleting id",id,"val=",nextVal);
}
- assertU("<delete><id>" + id + "</id></delete>");
- model.put(id, -nextVal);
+ // assertU("<delete><id>" + id + "</id></delete>");
+ Long version = deleteAndGetVersion(Integer.toString(id));
+
+ model.put(id, new DocInfo(version, -nextVal));
+
if (VERBOSE) {
verbose("deleting id", id, "val=",nextVal,"DONE");
}
@@ -265,7 +285,7 @@ public class TestRealTimeGet extends Sol
}
assertU("<delete><query>id:" + id + "</query></delete>");
- model.put(id, -nextVal);
+ model.put(id, new DocInfo(-1L, -nextVal));
if (VERBOSE) {
verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
}
@@ -274,16 +294,251 @@ public class TestRealTimeGet extends Sol
verbose("adding id", id, "val=", nextVal);
}
- assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- model.put(id, nextVal);
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)));
+ model.put(id, new DocInfo(version, nextVal));
if (VERBOSE) {
verbose("adding id", id, "val=", nextVal,"DONE");
}
}
+ } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestRealTimeGet.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVal < Math.abs(info.val) || foundVer==info.version && (foundVal != info.val)) {
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ operations.set(-1L);
+ SolrException.log(log, e);
+ fail(e.getMessage());
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+
+
+ @Test
+ public void testStressGetRealtimeVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random.nextInt(20);
+ final int softCommitPercent = 30+random.nextInt(60); // what percent of the commits are soft
+ final int deletePercent = 4+random.nextInt(25);
+ final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
+ final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+ int nWriteThreads = 5 + random.nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 60;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 5 + random.nextInt(25);
+
+
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestRealTimeGet.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(h.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("commit start");
+ assertU(commit());
+ verbose("commit end");
+ }
+
+ synchronized(TestRealTimeGet.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
}
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+ // synchronized (sync) {
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ if (VERBOSE) {
+ verbose("deleting id",id,"val=",nextVal);
+ }
+
+ // assertU("<delete><id>" + id + "</id></delete>");
+ Long version = deleteAndGetVersion(Integer.toString(id));
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("deleting id", id, "val=",nextVal,"DONE");
+ }
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ if (VERBOSE) {
+ verbose("deleteByQuery id ",id, "val=",nextVal);
+ }
+
+ assertU("<delete><query>id:" + id + "</query></delete>");
+ model.put(id, new DocInfo(-1L, -nextVal));
+ if (VERBOSE) {
+ verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
+ }
+ } else {
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal);
+ }
+
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)));
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"DONE");
+ }
+
+ }
+ // } // end sync
+
if (!before) {
lastId = id;
}
@@ -315,13 +570,13 @@ public class TestRealTimeGet extends Sol
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
- long val;
+ DocInfo info;
if (realTime) {
- val = model.get(id);
+ info = model.get(id);
} else {
synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
+ info = committedModel.get(id);
}
}
@@ -343,9 +598,10 @@ public class TestRealTimeGet extends Sol
} else {
assertEquals(1, doclist.size());
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
- if (foundVal < Math.abs(val)) {
- verbose("ERROR, id=", id, "foundVal=",foundVal,"model val=",val);
- assertTrue(foundVal >= Math.abs(val));
+ long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+ if (foundVer < Math.abs(info.version) || foundVer==info.version && (foundVal != info.val)) {
+ verbose("ERROR, id=", id, "found=",response,"model",info);
+ assertTrue(false);
}
}
}
@@ -372,4 +628,18 @@ public class TestRealTimeGet extends Sol
}
+
+ private Long addAndGetVersion(SolrInputDocument sdoc) throws Exception {
+ String response = updateJ(jsonAdd(sdoc), null);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ return (Long) ((List)rsp.get("adds")).get(1);
+ }
+
+ private Long deleteAndGetVersion(String id) throws Exception {
+ String response = updateJ("{\"delete\":{\"id\":\""+id+"\"}}", null);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ return (Long) ((List)rsp.get("deletes")).get(1);
+ }
+
+
}
Modified: lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java (original)
+++ lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java Mon Oct 10 15:12:31 2011
@@ -672,6 +672,7 @@ public abstract class SolrTestCaseJ4 ext
/** Send JSON update commands */
public static String updateJ(String json, SolrParams args) throws Exception {
SolrCore core = h.getCore();
+ if (args == null) args = params("wt","json","indent","true");
DirectSolrConnection connection = new DirectSolrConnection(core);
SolrRequestHandler handler = core.getRequestHandler("/udate/json");
if (handler == null) {
@@ -681,6 +682,63 @@ public abstract class SolrTestCaseJ4 ext
return connection.request(handler, args, json);
}
+ public static SolrInputDocument sdoc(Object... fieldsAndValues) {
+ SolrInputDocument sd = new SolrInputDocument();
+ for (int i=0; i<fieldsAndValues.length; i+=2) {
+ sd.addField((String)fieldsAndValues[i], fieldsAndValues[i+1]);
+ }
+ return sd;
+ }
+
+ /** Creates JSON from a SolrInputDocument. Doesn't currently handle boosts. */
+ public static String json(SolrInputDocument doc) {
+ CharArr out = new CharArr();
+ try {
+ out.append('{');
+ boolean firstField = true;
+ for (SolrInputField sfield : doc) {
+ if (firstField) firstField=false;
+ else out.append(',');
+ JSONUtil.writeString(sfield.getName(), 0, sfield.getName().length(), out);
+ out.append(':');
+ if (sfield.getValueCount() > 1) {
+ out.append('[');
+ }
+ boolean firstVal = true;
+ for (Object val : sfield) {
+ if (firstVal) firstVal=false;
+ else out.append(',');
+ out.append(JSONUtil.toJSON(val));
+ }
+ if (sfield.getValueCount() > 1) {
+ out.append(']');
+ }
+ }
+ out.append('}');
+ } catch (IOException e) {
+ // should never happen
+ }
+ return out.toString();
+ }
+
+ /** Creates a JSON add command from a SolrInputDocument list. Doesn't currently handle boosts. */
+ public static String jsonAdd(SolrInputDocument... docs) {
+ CharArr out = new CharArr();
+ try {
+ out.append('[');
+ boolean firstField = true;
+ for (SolrInputDocument doc : docs) {
+ if (firstField) firstField=false;
+ else out.append(',');
+ out.append(json(doc));
+ }
+ out.append(']');
+ } catch (IOException e) {
+ // should never happen
+ }
+ return out.toString();
+ }
+
/////////////////////////////////////////////////////////////////////////////////////
//////////////////////////// random document / index creation ///////////////////////