You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/10/10 17:12:32 UTC

svn commit: r1181011 - in /lucene/dev/branches/solrcloud/solr: core/src/test-files/solr/conf/schema12.xml core/src/test/org/apache/solr/search/TestRealTimeGet.java test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java

Author: yonik
Date: Mon Oct 10 15:12:31 2011
New Revision: 1181011

URL: http://svn.apache.org/viewvc?rev=1181011&view=rev
Log:
SOLR-2816: test versions via realtime get

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
    lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test-files/solr/conf/schema12.xml Mon Oct 10 15:12:31 2011
@@ -530,6 +530,11 @@
    <field name="uniq3" type="string" indexed="true" stored="true"/>
    <field name="nouniq" type="string" indexed="true" stored="true" multiValued="true"/>
 
+   <!-- for versioning -->
+   <field name="_version_" type="long" indexed="true" stored="true"/>
+
+
+
    <dynamicField name="*_coordinate"  type="tdouble" indexed="true"  stored="false"/>
 
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Mon Oct 10 15:12:31 2011
@@ -20,6 +20,7 @@ package org.apache.solr.search;
 import org.apache.noggit.ObjectBuilder;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.request.SolrQueryRequest;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -48,10 +49,10 @@ public class TestRealTimeGet extends Sol
     assertJQ(req("q","id:1")
         ,"/response/numFound==0"
     );
-    assertJQ(req("qt","/get","id","1")
+    assertJQ(req("qt","/get", "id","1", "fl","id")
         ,"=={'doc':{'id':'1'}}"
     );
-    assertJQ(req("qt","/get","ids","1")
+    assertJQ(req("qt","/get","ids","1", "fl","id")
         ,"=={" +
         "  'response':{'numFound':1,'start':0,'docs':[" +
         "      {" +
@@ -64,10 +65,10 @@ public class TestRealTimeGet extends Sol
     assertJQ(req("q","id:1")
         ,"/response/numFound==1"
     );
-    assertJQ(req("qt","/get","id","1")
+    assertJQ(req("qt","/get","id","1", "fl","id")
         ,"=={'doc':{'id':'1'}}"
     );
-    assertJQ(req("qt","/get","ids","1")
+    assertJQ(req("qt","/get","ids","1", "fl","id")
         ,"=={" +
         "  'response':{'numFound':1,'start':0,'docs':[" +
         "      {" +
@@ -136,9 +137,22 @@ public class TestRealTimeGet extends Sol
     System.out.println(sb.toString());
   }
 
+  static class DocInfo {
+    long version;
+    long val;
+
+    public DocInfo(long version, long val) {
+      this.version = version;
+      this.val = val;
+    }
+
+    public String toString() {
+      return "{version="+version+",val="+val+"\"";
+    }
+  }
 
-  final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
-  Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
+  final ConcurrentHashMap<Integer,DocInfo> model = new ConcurrentHashMap<Integer,DocInfo>();
+  Map<Integer,DocInfo> committedModel = new HashMap<Integer,DocInfo>();
   long snapshotCount;
   long committedModelClock;
   volatile int lastId;
@@ -153,7 +167,7 @@ public class TestRealTimeGet extends Sol
     syncArr = new Object[ndocs];
 
     for (int i=0; i<ndocs; i++) {
-      model.put(i, -1L);
+      model.put(i, new DocInfo(0, -1L));
       syncArr[i] = new Object();
     }
     committedModel.putAll(model);
@@ -199,11 +213,11 @@ public class TestRealTimeGet extends Sol
 
             if (oper < commitPercent) {
               if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
-                Map<Integer,Long> newCommittedModel;
+                Map<Integer,DocInfo> newCommittedModel;
                 long version;
 
                 synchronized(TestRealTimeGet.this) {
-                  newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
                   version = snapshotCount++;
                 }
 
@@ -245,8 +259,11 @@ public class TestRealTimeGet extends Sol
 
             // We can't concurrently update the same document and retain our invariants of increasing values
             // since we can't guarantee what order the updates will be executed.
-            synchronized (sync) {
-              Long val = model.get(id);
+            // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+           synchronized (sync) {
+              DocInfo info = model.get(id);
+
+              long val = info.val;
               long nextVal = Math.abs(val)+1;
 
               if (oper < commitPercent + deletePercent) {
@@ -254,8 +271,11 @@ public class TestRealTimeGet extends Sol
                   verbose("deleting id",id,"val=",nextVal);
                 }
 
-                assertU("<delete><id>" + id + "</id></delete>");
-                model.put(id, -nextVal);
+                // assertU("<delete><id>" + id + "</id></delete>");
+                Long version = deleteAndGetVersion(Integer.toString(id));
+
+                model.put(id, new DocInfo(version, -nextVal));
+
                 if (VERBOSE) {
                   verbose("deleting id", id, "val=",nextVal,"DONE");
                 }
@@ -265,7 +285,7 @@ public class TestRealTimeGet extends Sol
                 }
 
                 assertU("<delete><query>id:" + id + "</query></delete>");
-                model.put(id, -nextVal);
+                model.put(id, new DocInfo(-1L, -nextVal));
                 if (VERBOSE) {
                   verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
                 }
@@ -274,16 +294,251 @@ public class TestRealTimeGet extends Sol
                   verbose("adding id", id, "val=", nextVal);
                 }
 
-                assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
-                model.put(id, nextVal);
+                // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)));
+                model.put(id, new DocInfo(version, nextVal));
 
                 if (VERBOSE) {
                   verbose("adding id", id, "val=", nextVal,"DONE");
                 }
 
               }
+            }   // end sync
+
+            if (!before) {
+              lastId = id;
+            }
+          }
+        } catch (Throwable e) {
+          operations.set(-1L);
+          SolrException.log(log, e);
+          fail(e.getMessage());
+        }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = model.get(id);
+              } else {
+                synchronized(TestRealTimeGet.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+              if (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVal < Math.abs(info.val) || foundVer==info.version && (foundVal != info.val)) {
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
+                }
+              }
+            }
+          }
+          catch (Throwable e) {
+            operations.set(-1L);
+            SolrException.log(log, e);
+            fail(e.getMessage());
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+  }
+
+
+
+  @Test
+  public void testStressGetRealtimeVersions() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    final int commitPercent = 5 + random.nextInt(20);
+    final int softCommitPercent = 30+random.nextInt(60); // what percent of the commits are soft
+    final int deletePercent = 4+random.nextInt(25);
+    final int deleteByQueryPercent = 0;  // real-time get isn't currently supported with delete-by-query
+    final int ndocs = 5 + (random.nextBoolean() ? random.nextInt(25) : random.nextInt(200));
+    int nWriteThreads = 5 + random.nextInt(25);
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+        // query variables
+    final int percentRealtimeQuery = 60;
+    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+    int nReadThreads = 5 + random.nextInt(25);
+
+
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+          while (operations.get() > 0) {
+            int oper = rand.nextInt(100);
+
+            if (oper < commitPercent) {
+              if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                Map<Integer,DocInfo> newCommittedModel;
+                long version;
+
+                synchronized(TestRealTimeGet.this) {
+                  newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
+                  version = snapshotCount++;
+                }
+
+                if (rand.nextInt(100) < softCommitPercent) {
+                  verbose("softCommit start");
+                  assertU(h.commit("softCommit","true"));
+                  verbose("softCommit end");
+                } else {
+                  verbose("commit start");
+                  assertU(commit());
+                  verbose("commit end");
+                }
+
+                synchronized(TestRealTimeGet.this) {
+                  // install this model snapshot only if it's newer than the current one
+                  if (version >= committedModelClock) {
+                    if (VERBOSE) {
+                      verbose("installing new committedModel version="+committedModelClock);
+                    }
+                    committedModel = newCommittedModel;
+                    committedModelClock = version;
+                  }
+                }
+              }
+              numCommitting.decrementAndGet();
+              continue;
+            }
+
+
+            int id = rand.nextInt(ndocs);
+            Object sync = syncArr[id];
+
+            // set the lastId before we actually change it sometimes to try and
+            // uncover more race conditions between writing and reading
+            boolean before = rand.nextBoolean();
+            if (before) {
+              lastId = id;
             }
 
+            // We can't concurrently update the same document and retain our invariants of increasing values
+            // since we can't guarantee what order the updates will be executed.
+            // Even with versions, we can't remove the sync because increasing versions does not mean increasing vals.
+           // synchronized (sync) {
+              DocInfo info = model.get(id);
+
+              long val = info.val;
+              long nextVal = Math.abs(val)+1;
+
+              if (oper < commitPercent + deletePercent) {
+                if (VERBOSE) {
+                  verbose("deleting id",id,"val=",nextVal);
+                }
+
+                // assertU("<delete><id>" + id + "</id></delete>");
+                Long version = deleteAndGetVersion(Integer.toString(id));
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                if (VERBOSE) {
+                  verbose("deleting id", id, "val=",nextVal,"DONE");
+                }
+              } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+                if (VERBOSE) {
+                  verbose("deleteByQuery id ",id, "val=",nextVal);
+                }
+
+                assertU("<delete><query>id:" + id + "</query></delete>");
+                model.put(id, new DocInfo(-1L, -nextVal));
+                if (VERBOSE) {
+                  verbose("deleteByQuery id",id, "val=",nextVal,"DONE");
+                }
+              } else {
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal);
+                }
+
+                // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                Long version = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal)));
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
+
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"DONE");
+                }
+
+              }
+            // }   // end sync
+
             if (!before) {
               lastId = id;
             }
@@ -315,13 +570,13 @@ public class TestRealTimeGet extends Sol
               // so when querying, we should first check the model, and then the index
 
               boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
-              long val;
+              DocInfo info;
 
               if (realTime) {
-                val = model.get(id);
+                info = model.get(id);
               } else {
                 synchronized(TestRealTimeGet.this) {
-                  val = committedModel.get(id);
+                  info = committedModel.get(id);
                 }
               }
 
@@ -343,9 +598,10 @@ public class TestRealTimeGet extends Sol
               } else {
                 assertEquals(1, doclist.size());
                 long foundVal = (Long)(((Map)doclist.get(0)).get(field));
-                if (foundVal < Math.abs(val)) {
-                  verbose("ERROR, id=", id, "foundVal=",foundVal,"model val=",val);
-                  assertTrue(foundVal >= Math.abs(val));
+                long foundVer = (Long)(((Map)doclist.get(0)).get("_version_"));
+                if (foundVer < Math.abs(info.version) || foundVer==info.version && (foundVal != info.val)) {
+                  verbose("ERROR, id=", id, "found=",response,"model",info);
+                  assertTrue(false);
                 }
               }
             }
@@ -372,4 +628,18 @@ public class TestRealTimeGet extends Sol
 
   }
 
+
+  private Long addAndGetVersion(SolrInputDocument sdoc) throws Exception {
+    String response = updateJ(jsonAdd(sdoc), null);
+    Map rsp = (Map)ObjectBuilder.fromJSON(response);
+    return (Long) ((List)rsp.get("adds")).get(1);
+  }
+
+  private Long deleteAndGetVersion(String id) throws Exception {
+    String response = updateJ("{\"delete\":{\"id\":\""+id+"\"}}", null);
+    Map rsp = (Map)ObjectBuilder.fromJSON(response);
+    return (Long) ((List)rsp.get("deletes")).get(1);
+  }
+
+
 }

Modified: lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java?rev=1181011&r1=1181010&r2=1181011&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java (original)
+++ lucene/dev/branches/solrcloud/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java Mon Oct 10 15:12:31 2011
@@ -672,6 +672,7 @@ public abstract class SolrTestCaseJ4 ext
   /** Send JSON update commands */
   public static String updateJ(String json, SolrParams args) throws Exception {
     SolrCore core = h.getCore();
+    if (args == null) args = params("wt","json","indent","true");
     DirectSolrConnection connection = new DirectSolrConnection(core);
     SolrRequestHandler handler = core.getRequestHandler("/udate/json");
     if (handler == null) {
@@ -681,6 +682,63 @@ public abstract class SolrTestCaseJ4 ext
     return connection.request(handler, args, json);
   }
 
+  public static SolrInputDocument sdoc(Object... fieldsAndValues) {
+    SolrInputDocument sd = new SolrInputDocument();
+    for (int i=0; i<fieldsAndValues.length; i+=2) {
+      sd.addField((String)fieldsAndValues[i], fieldsAndValues[i+1]);
+    }
+    return sd;
+  }
+
+  /** Creates JSON from a SolrInputDocument.  Doesn't currently handle boosts. */
+  public static String json(SolrInputDocument doc) {
+     CharArr out = new CharArr();
+    try {
+      out.append('{');
+      boolean firstField = true;
+      for (SolrInputField sfield : doc) {
+        if (firstField) firstField=false;
+        else out.append(',');
+        JSONUtil.writeString(sfield.getName(), 0, sfield.getName().length(), out);
+        out.append(':');
+        if (sfield.getValueCount() > 1) {
+          out.append('[');
+        }
+        boolean firstVal = true;
+        for (Object val : sfield) {
+          if (firstVal) firstVal=false;
+          else out.append(',');
+          out.append(JSONUtil.toJSON(val));
+        }
+        if (sfield.getValueCount() > 1) {
+          out.append(']');
+        }
+      }
+      out.append('}');
+    } catch (IOException e) {
+      // should never happen
+    }
+    return out.toString();
+  }
+
+  /** Creates a JSON add command from a SolrInputDocument list.  Doesn't currently handle boosts. */
+  public static String jsonAdd(SolrInputDocument... docs) {
+    CharArr out = new CharArr();
+    try {
+      out.append('[');
+      boolean firstField = true;
+      for (SolrInputDocument doc : docs) {
+        if (firstField) firstField=false;
+        else out.append(',');
+        out.append(json(doc));
+      }
+      out.append(']');
+    } catch (IOException e) {
+      // should never happen
+    }
+    return out.toString();
+  }
+
 
   /////////////////////////////////////////////////////////////////////////////////////
   //////////////////////////// random document / index creation ///////////////////////