You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/07/03 17:05:34 UTC

svn commit: r1356778 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/search/

Author: yonik
Date: Tue Jul  3 15:05:30 2012
New Revision: 1356778

URL: http://svn.apache.org/viewvc?rev=1356778&view=rev
Log:
SOLR-3559: DBQ reorder support

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Jul  3 15:05:30 2012
@@ -35,6 +35,10 @@ New Features
 * SOLR-3488: Added a Collection management API for SolrCloud.
   (Tommaso Teofili, Sami Siren, yonik, Mark Miller)
 
+* SOLR-3559: Full deleteByQuery support with SolrCloud distributed indexing.
+  All replicas of a shard will be consistent, even if updates arrive in a
+  different order on different replicas.  (yonik)
+
 Bug Fixes
 
 * SOLR-3582: Our ZooKeeper watchers respond to session events as if they are change events, 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Tue Jul  3 15:05:30 2012
@@ -881,13 +881,14 @@ public final class SolrCore implements S
   public static void verbose(Object... args) {
     if (!VERBOSE) return;
     StringBuilder sb = new StringBuilder("VERBOSE:");
-    sb.append(Thread.currentThread().getName());
-    sb.append(':');
+//    sb.append(Thread.currentThread().getName());
+//    sb.append(':');
     for (Object o : args) {
       sb.append(' ');
       sb.append(o==null ? "(null)" : o.toString());
     }
-    System.out.println(sb.toString());
+    // System.out.println(sb.toString());
+    log.info(sb.toString());
   }
 
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Tue Jul  3 15:05:30 2012
@@ -19,6 +19,7 @@ package org.apache.solr.update;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -27,6 +28,8 @@ import org.apache.solr.request.SolrQuery
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 
+import java.util.List;
+
 /**
  *
  */
@@ -42,6 +45,7 @@ public class AddUpdateCommand extends Up
    public boolean overwrite = true;
    
    public Term updateTerm;
+
    public int commitWithin = -1;
    
    public AddUpdateCommand(SolrQueryRequest req) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Tue Jul  3 15:05:30 2012
@@ -22,7 +22,9 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -160,37 +162,67 @@ public class DirectUpdateHandler2 extend
     try {
 
       if (cmd.overwrite) {
-        Term updateTerm;
-        Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
-        boolean del = false;
-        if (cmd.updateTerm == null) {
-          updateTerm = idTerm;
-        } else {
-          del = true;
-          updateTerm = cmd.updateTerm;
+
+        // Check for delete by query commands newer (i.e. reordered).  This should always be null on a leader
+        List<UpdateLog.DBQ> deletesAfter = null;
+        if (ulog != null && cmd.version > 0) {
+          deletesAfter = ulog.getDBQNewer(cmd.version);
         }
 
-        Document luceneDocument = cmd.getLuceneDocument();
-        // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
-        writer.updateDocument(updateTerm, luceneDocument);
-        // SolrCore.verbose("updateDocument",updateTerm,"DONE");
-
-        if(del) { // ensure id remains unique
-          BooleanQuery bq = new BooleanQuery();
-          bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT));
-          bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
-          writer.deleteDocuments(bq);
+        if (deletesAfter != null) {
+          List<Query> dbqList = new ArrayList<Query>(deletesAfter.size());
+          for (UpdateLog.DBQ dbq : deletesAfter) {
+            try {
+              DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
+              tmpDel.query = dbq.q;
+              tmpDel.version = -dbq.version;
+              dbqList.add( getQuery(tmpDel) );
+            } catch (Exception e) {
+              log.error("Exception parsing reordered query : " + dbq, e);
+            }
+          }
+
+          addAndDelete(cmd, dbqList);
+        } else {
+          // normal update
+
+          Term updateTerm;
+          Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
+          boolean del = false;
+          if (cmd.updateTerm == null) {
+            updateTerm = idTerm;
+          } else {
+            del = true;
+            updateTerm = cmd.updateTerm;
+          }
+
+          Document luceneDocument = cmd.getLuceneDocument();
+          // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
+          writer.updateDocument(updateTerm, luceneDocument);
+          // SolrCore.verbose("updateDocument",updateTerm,"DONE");
+
+
+          if(del) { // ensure id remains unique
+            BooleanQuery bq = new BooleanQuery();
+            bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT));
+            bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
+            writer.deleteDocuments(bq);
+          }
+
+
+          // Add to the transaction log *after* successfully adding to the index, if there was no error.
+          // This ordering ensures that if we log it, it's definitely been added to the the index.
+          // This also ensures that if a commit sneaks in-between, that we know everything in a particular
+          // log version was definitely committed.
+          if (ulog != null) ulog.add(cmd);
         }
+
       } else {
         // allow duplicates
         writer.addDocument(cmd.getLuceneDocument());
+        if (ulog != null) ulog.add(cmd);
       }
 
-      // Add to the transaction log *after* successfully adding to the index, if there was no error.
-      // This ordering ensures that if we log it, it's definitely been added to the the index.
-      // This also ensures that if a commit sneaks in-between, that we know everything in a particular
-      // log version was definitely committed.
-      if (ulog != null) ulog.add(cmd);
 
       if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
         commitTracker.addedDocument( -1 );
@@ -243,6 +275,43 @@ public class DirectUpdateHandler2 extend
     updateDeleteTrackers(cmd);
   }
 
+
+  public void clearIndex() throws IOException {
+    deleteAll();
+    if (ulog != null) {
+      ulog.deleteAll();
+    }
+  }
+
+
+  private Query getQuery(DeleteUpdateCommand cmd) {
+    Query q;
+    try {
+      // move this higher in the stack?
+      QParser parser = QParser.getParser(cmd.getQuery(), "lucene", cmd.req);
+      q = parser.getQuery();
+      q = QueryUtils.makeQueryable(q);
+
+      // Make sure not to delete newer versions
+      if (ulog != null && cmd.getVersion() != 0 && cmd.getVersion() != -Long.MAX_VALUE) {
+        BooleanQuery bq = new BooleanQuery();
+        bq.add(q, Occur.MUST);
+        SchemaField sf = ulog.getVersionInfo().getVersionField();
+        ValueSource vs = sf.getType().getValueSource(sf, null);
+        ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.getVersion())), true, true);
+        FunctionRangeQuery range = new FunctionRangeQuery(filt);
+        bq.add(range, Occur.MUST);
+        q = bq;
+      }
+
+      return q;
+
+    } catch (ParseException e) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+    }
+  }
+
+
   // we don't return the number of docs deleted because it's not always possible to quickly know that info.
   @Override
   public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
@@ -250,35 +319,19 @@ public class DirectUpdateHandler2 extend
     deleteByQueryCommandsCumulative.incrementAndGet();
     boolean madeIt=false;
     try {
-      Query q;
-      try {
-        // TODO: move this higher in the stack?
-        QParser parser = QParser.getParser(cmd.query, "lucene", cmd.req);
-        q = parser.getQuery();
-        q = QueryUtils.makeQueryable(q);
-
-        // peer-sync can cause older deleteByQueries to be executed and could
-        // delete newer documents.  We prevent this by adding a clause restricting
-        // version.
-        if ((cmd.getFlags() & UpdateCommand.PEER_SYNC) != 0) {
-          BooleanQuery bq = new BooleanQuery();
-          bq.add(q, Occur.MUST);
-          SchemaField sf = core.getSchema().getField(VersionInfo.VERSION_FIELD);
-          ValueSource vs = sf.getType().getValueSource(sf, null);
-          ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.version)), true, true);
-          FunctionRangeQuery range = new FunctionRangeQuery(filt);
-          bq.add(range, Occur.MUST);
-          q = bq;
-        }
-
-
-
-      } catch (ParseException e) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
-      }
+      Query q = getQuery(cmd);
       
       boolean delAll = MatchAllDocsQuery.class == q.getClass();
 
+      // currently for testing purposes.  Do a delete of complete index w/o worrying about versions, don't log, clean up most state in update log, etc
+      if (delAll && cmd.getVersion() == -Long.MAX_VALUE) {
+        synchronized (this) {
+          deleteAll();
+          ulog.deleteAll();
+          return;
+        }
+      }
+
       //
       // synchronized to prevent deleteByQuery from running during the "open new searcher"
       // part of a commit.  DBQ needs to signal that a fresh reader will be needed for
@@ -307,6 +360,31 @@ public class DirectUpdateHandler2 extend
     }
   }
 
+
+
+  /** Add a document execute the deletes as atomically as possible */
+  private void addAndDelete(AddUpdateCommand cmd, List<Query> dbqList) throws IOException {
+    Document luceneDocument = cmd.getLuceneDocument();
+    Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
+
+    // see comment in deleteByQuery
+    synchronized (this) {
+      IndexWriter writer = solrCoreState.getIndexWriter(core);
+
+      writer.updateDocument(idTerm, luceneDocument);
+
+      for (Query q : dbqList) {
+        writer.deleteDocuments(q);
+      }
+
+      if (ulog != null) ulog.add(cmd, true);
+    }
+
+  }
+
+
+
+
   @Override
   public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
     mergeIndexesCommands.incrementAndGet();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Tue Jul  3 15:05:30 2012
@@ -34,6 +34,7 @@ public abstract class UpdateCommand impl
   public static int REPLAY    = 0x00000002;    // update command is from replaying a log.
   public static int PEER_SYNC    = 0x00000004; // update command is a missing update being provided by a peer.
   public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
+  public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log.  used when applying reordered DBQ updates when doing an add.
 
   public UpdateCommand(SolrQueryRequest req) {
     this.req = req;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Tue Jul  3 15:05:30 2012
@@ -17,6 +17,7 @@
 
 package org.apache.solr.update;
 
+import org.apache.lucene.search.Query;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -104,7 +105,9 @@ public class UpdateLog implements Plugin
   private TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap
 
   private final int numDeletesToKeep = 1000;
+  private final int numDeletesByQueryToKeep = 100;
   public final int numRecordsToKeep = 100;
+
   // keep track of deletes only... this is not updated on an add
   private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
     protected boolean removeEldestEntry(Map.Entry eldest) {
@@ -112,6 +115,18 @@ public class UpdateLog implements Plugin
     }
   };
 
+  public class DBQ {
+    public String q;     // the query string
+    public long version; // positive version of the DBQ
+
+    @Override
+    public String toString() {
+      return "DBQ{version=" + version + ",q="+q+"}";
+    }
+  }
+
+  private LinkedList<DBQ> deleteByQueries = new LinkedList<DBQ>();
+
   private String[] tlogFiles;
   private File tlogDir;
   private Collection<String> globalStrings;
@@ -207,6 +222,16 @@ public class UpdateLog implements Plugin
         DeleteUpdate du = startingUpdates.deleteList.get(i);
         oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
       }
+
+      // populate recent deleteByQuery commands
+      for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
+        Update update = startingUpdates.deleteByQueryList.get(i);
+        List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
+        long version = (Long) dbq.get(1);
+        String q = (String) dbq.get(2);
+        trackDeleteByQuery(q, version);
+      }
+
     } finally {
       startingUpdates.close();
     }
@@ -280,6 +305,11 @@ public class UpdateLog implements Plugin
 
 
   public void add(AddUpdateCommand cmd) {
+    add(cmd, false);
+  }
+
+
+  public void add(AddUpdateCommand cmd, boolean clearCaches) {
     // don't log if we are replaying from another log
     // TODO: we currently need to log to maintain correct versioning, rtg, etc
     // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
@@ -293,21 +323,43 @@ public class UpdateLog implements Plugin
         pos = tlog.write(cmd, operationFlags);
       }
 
-      // TODO: in the future we could support a real position for a REPLAY update.
-      // Only currently would be useful for RTG while in recovery mode though.
-      LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+      if (!clearCaches) {
+        // TODO: in the future we could support a real position for a REPLAY update.
+        // Only currently would be useful for RTG while in recovery mode though.
+        LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+        // only update our map if we're not buffering
+        if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+          map.put(cmd.getIndexedId(), ptr);
+        }
 
-      // only update our map if we're not buffering
-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
-        map.put(cmd.getIndexedId(), ptr);
-      }
+        if (trace) {
+          log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+        }
+
+      } else {
+        // replicate the deleteByQuery logic.  See deleteByQuery for comments.
+
+        map.clear();
+
+        try {
+          RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+          holder.decref();
+        } catch (Throwable e) {
+          SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+        }
+
+        if (trace) {
+          log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " clearCaches=true");
+        }
 
-      if (trace) {
-        log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
       }
+
     }
   }
 
+
+
   public void delete(DeleteUpdateCommand cmd) {
     BytesRef br = cmd.getIndexedId();
 
@@ -350,6 +402,8 @@ public class UpdateLog implements Plugin
         // affected and hence we must purge our caches.
         map.clear();
 
+        trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
+
         // oldDeletes.clear();
 
         // We must cause a new IndexReader to be opened before anything looks at these caches again
@@ -373,6 +427,72 @@ public class UpdateLog implements Plugin
     }
   }
 
+  /** currently for testing only */
+  public void deleteAll() {
+    synchronized (this) {
+
+      try {
+        RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+        holder.decref();
+      } catch (Throwable e) {
+        SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+      }
+
+      map.clear();
+      oldDeletes.clear();
+      deleteByQueries.clear();
+    }
+  }
+
+
+  void trackDeleteByQuery(String q, long version) {
+    version = Math.abs(version);
+    DBQ dbq = new DBQ();
+    dbq.q = q;
+    dbq.version = version;
+
+    synchronized (this) {
+      if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) {
+        // common non-reordered case
+        deleteByQueries.addFirst(dbq);
+      } else {
+        // find correct insertion point
+        ListIterator<DBQ> iter = deleteByQueries.listIterator();
+        iter.next();  // we already checked the first element in the previous "if" clause
+        while (iter.hasNext()) {
+          DBQ oldDBQ = iter.next();
+          if (oldDBQ.version < version) {
+            iter.previous();
+            break;
+          } else if (oldDBQ.version == version && oldDBQ.q.equals(q)) {
+            // a duplicate
+            return;
+          }
+        }
+        iter.add(dbq);  // this also handles the case of adding at the end when hasNext() == false
+      }
+
+      if (deleteByQueries.size() > numDeletesByQueryToKeep) {
+        deleteByQueries.removeLast();
+      }
+    }
+  }
+
+  public List<DBQ> getDBQNewer(long version) {
+    synchronized (this) {
+      if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) {
+        // fast common case
+        return null;
+      }
+
+      List<DBQ> dbqList = new ArrayList<DBQ>();
+      for (DBQ dbq : deleteByQueries) {
+        if (dbq.version <= version) break;
+        dbqList.add(dbq);
+      }
+      return dbqList;
+    }
+  }
 
   private void newMap() {
     prevMap2 = prevMap;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Tue Jul  3 15:05:30 2012
@@ -443,6 +443,9 @@ public class DistributedUpdateProcessor 
         // realtime-get to work reliably.
         // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
         // there may be other reasons in the future for a version on the commands
+
+        boolean checkDeleteByQueries = false;
+
         if (versionsStored) {
 
           long bucketVersion = bucket.highest;
@@ -484,7 +487,7 @@ public class DistributedUpdateProcessor 
             // if we aren't the leader, then we need to check that updates were not re-ordered
             if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
               // we're OK... this update has a version higher than anything we've seen
-              // in this bucket so far, so we know that no reordering has yet occured.
+              // in this bucket so far, so we know that no reordering has yet occurred.
               bucket.updateHighest(versionOnUpdate);
             } else {
               // there have been updates higher than the current update.  we need to check
@@ -494,11 +497,16 @@ public class DistributedUpdateProcessor 
                 // This update is a repeat, or was reordered.  We need to drop this update.
                 return true;
               }
+
+              // also need to re-apply newer deleteByQuery commands
+              checkDeleteByQueries = true;
             }
           }
         }
 
+        // TODO: possibly set checkDeleteByQueries as a flag on the command?
         doLocalAdd(cmd);
+
       }  // end synchronized (bucket)
     } finally {
       vinfo.unlockForUpdate();
@@ -654,7 +662,7 @@ public class DistributedUpdateProcessor 
     // FROM: we are a replica receiving a DBQ from our leader
     //       - log + execute the local DBQ
     DistribPhase phase = 
-      DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+    DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
 
     if (zkEnabled && DistribPhase.NONE == phase) {
       boolean leaderForAnyShard = false;  // start off by assuming we are not a leader for any shard
@@ -772,8 +780,6 @@ public class DistributedUpdateProcessor 
     }
 
 
-
-    // TODO: need to handle reorders to replicas somehow
     // forward to all replicas
     if (leaderLogic && replicas != null) {
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Tue Jul  3 15:05:30 2012
@@ -63,6 +63,18 @@ public class TestRealTimeGet extends Sol
     initCore("solrconfig-tlog.xml","schema15.xml");
   }
 
+  // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+  // since Solr can think following updates were reordered.
+  @Override
+  public void clearIndex() {
+    try {
+      deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
   @Test
   public void testGetRealtime() throws Exception {
     clearIndex();
@@ -212,7 +224,7 @@ public class TestRealTimeGet extends Sol
     assertU(commit());
 
     // make sure a reordered add doesn't take affect.
-    updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+    long version2 = deleteByQueryAndGetVersion("id:2", null);
 
     // test that it's still deleted
     assertJQ(req("qt","/get","id","1")
@@ -220,9 +232,33 @@ public class TestRealTimeGet extends Sol
     );
 
     version = addAndGetVersion(sdoc("id","2"), null);
-    long version2 = deleteByQueryAndGetVersion("id:2", null);
+    version2 = deleteByQueryAndGetVersion("id:2", null);
     assertTrue(Math.abs(version2) > version );
-    
+
+    // test that it's deleted
+    assertJQ(req("qt","/get","id","2")
+        ,"=={'doc':null}");
+
+
+    version2 = Math.abs(version2) + 1000;
+    updateJ(jsonAdd(sdoc("id","3", "_version_",Long.toString(version2+100))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+    updateJ(jsonAdd(sdoc("id","4", "_version_",Long.toString(version2+200))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+    // this should only affect id:3 so far
+    deleteByQueryAndGetVersion("id:(3 4 5 6)", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(-(version2+150))) );
+
+    assertJQ(req("qt","/get","id","3"),"=={'doc':null}");
+    assertJQ(req("qt","/get","id","4", "fl","id"),"=={'doc':{'id':'4'}}");
+
+    updateJ(jsonAdd(sdoc("id","5", "_version_",Long.toString(version2+201))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+    updateJ(jsonAdd(sdoc("id","6", "_version_",Long.toString(version2+101))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+   // the DBQ should also have caused id:6 to be removed
+    assertJQ(req("qt","/get","id","5", "fl","id"),"=={'doc':{'id':'5'}}");
+    assertJQ(req("qt","/get","id","6"),"=={'doc':null}");
+
+    assertU(commit());
+
   }
 
   @Test
@@ -425,7 +461,7 @@ public class TestRealTimeGet extends Sol
     }
 
     public String toString() {
-      return "{version="+version+",val="+val+"\"";
+      return "{version="+version+",val="+val+"}";
     }
   }
 
@@ -978,7 +1014,7 @@ public class TestRealTimeGet extends Sol
     final int commitPercent = 5 + random().nextInt(20);
     final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
     final int deletePercent = 4+random().nextInt(25);
-    final int deleteByQueryPercent = 0;  // delete-by-query can't be reordered on replicas
+    final int deleteByQueryPercent = 1+random().nextInt(7);
     final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
     int nWriteThreads = 5 + random().nextInt(25);
 
@@ -989,6 +1025,24 @@ public class TestRealTimeGet extends Sol
     final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
     int nReadThreads = 5 + random().nextInt(25);
 
+
+    /** // testing
+    final int commitPercent = 5;
+    final int softCommitPercent = 100; // what percent of the commits are soft
+    final int deletePercent = 0;
+    final int deleteByQueryPercent = 50;
+    final int ndocs = 1;
+    int nWriteThreads = 2;
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+    // query variables
+    final int percentRealtimeQuery = 101;
+    final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+    int nReadThreads = 1;
+    **/
+
+
     initModel(ndocs);
 
     final AtomicInteger numCommitting = new AtomicInteger();
@@ -1094,6 +1148,26 @@ public class TestRealTimeGet extends Sol
                 verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
               } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
 
+                verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
+
+                Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+                // TODO: returning versions for these types of updates is redundant
+                // but if we do return, they had better be equal
+                if (returnedVersion != null) {
+                  assertEquals(-version, returnedVersion.longValue());
+                }
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
+
               } else {
                 verbose("adding id", id, "val=", nextVal,"version",version);
 
@@ -1123,6 +1197,7 @@ public class TestRealTimeGet extends Sol
           }
           } catch (Throwable e) {
             operations.set(-1L);
+            log.error("",e);
             throw new RuntimeException(e);
           }
         }
@@ -1185,6 +1260,7 @@ public class TestRealTimeGet extends Sol
             }
           } catch (Throwable e) {
             operations.set(-1L);
+            log.error("",e);
             throw new RuntimeException(e);
           }
         }
@@ -1225,7 +1301,7 @@ public class TestRealTimeGet extends Sol
     final int commitPercent = 5 + random().nextInt(10);
     final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
     final int deletePercent = 4+random().nextInt(25);
-    final int deleteByQueryPercent = 0;  // real-time get isn't currently supported with delete-by-query
+    final int deleteByQueryPercent = random().nextInt(5);  // real-time get isn't currently supported with delete-by-query
     final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
     int nWriteThreads = 2 + random().nextInt(10);  // fewer write threads to give recovery thread more of a chance
 
@@ -1364,6 +1440,26 @@ public class TestRealTimeGet extends Sol
                 verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
               } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
 
+                verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
+
+                Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+                // TODO: returning versions for these types of updates is redundant
+                // but if we do return, they had better be equal
+                if (returnedVersion != null) {
+                  assertEquals(-version, returnedVersion.longValue());
+                }
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
+
               } else {
                 verbose("adding id", id, "val=", nextVal,"version",version);
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Tue Jul  3 15:05:30 2012
@@ -69,6 +69,19 @@ public class TestRecovery extends SolrTe
     }
   }
 
+
+  // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+  // since Solr can think following updates were reordered.
+  @Override
+  public void clearIndex() {
+    try {
+      deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
   @Test
   public void testLogReplay() throws Exception {
     try {