You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2012/07/03 17:05:34 UTC
svn commit: r1356778 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/core/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/search/
Author: yonik
Date: Tue Jul 3 15:05:30 2012
New Revision: 1356778
URL: http://svn.apache.org/viewvc?rev=1356778&view=rev
Log:
SOLR-3559: DBQ reorder support
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Jul 3 15:05:30 2012
@@ -35,6 +35,10 @@ New Features
* SOLR-3488: Added a Collection management API for SolrCloud.
(Tommaso Teofili, Sami Siren, yonik, Mark Miller)
+* SOLR-3559: Full deleteByQuery support with SolrCloud distributed indexing.
+ All replicas of a shard will be consistent, even if updates arrive in a
+ different order on different replicas. (yonik)
+
Bug Fixes
* SOLR-3582: Our ZooKeeper watchers respond to session events as if they are change events,
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/SolrCore.java Tue Jul 3 15:05:30 2012
@@ -881,13 +881,14 @@ public final class SolrCore implements S
public static void verbose(Object... args) {
if (!VERBOSE) return;
StringBuilder sb = new StringBuilder("VERBOSE:");
- sb.append(Thread.currentThread().getName());
- sb.append(':');
+// sb.append(Thread.currentThread().getName());
+// sb.append(':');
for (Object o : args) {
sb.append(' ');
sb.append(o==null ? "(null)" : o.toString());
}
- System.out.println(sb.toString());
+ // System.out.println(sb.toString());
+ log.info(sb.toString());
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java Tue Jul 3 15:05:30 2012
@@ -19,6 +19,7 @@ package org.apache.solr.update;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -27,6 +28,8 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
+import java.util.List;
+
/**
*
*/
@@ -42,6 +45,7 @@ public class AddUpdateCommand extends Up
public boolean overwrite = true;
public Term updateTerm;
+
public int commitWithin = -1;
public AddUpdateCommand(SolrQueryRequest req) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Tue Jul 3 15:05:30 2012
@@ -22,7 +22,9 @@ package org.apache.solr.update;
import java.io.IOException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -160,37 +162,67 @@ public class DirectUpdateHandler2 extend
try {
if (cmd.overwrite) {
- Term updateTerm;
- Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
- boolean del = false;
- if (cmd.updateTerm == null) {
- updateTerm = idTerm;
- } else {
- del = true;
- updateTerm = cmd.updateTerm;
+
+ // Check for delete by query commands newer (i.e. reordered). This should always be null on a leader
+ List<UpdateLog.DBQ> deletesAfter = null;
+ if (ulog != null && cmd.version > 0) {
+ deletesAfter = ulog.getDBQNewer(cmd.version);
}
- Document luceneDocument = cmd.getLuceneDocument();
- // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
- writer.updateDocument(updateTerm, luceneDocument);
- // SolrCore.verbose("updateDocument",updateTerm,"DONE");
-
- if(del) { // ensure id remains unique
- BooleanQuery bq = new BooleanQuery();
- bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT));
- bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
- writer.deleteDocuments(bq);
+ if (deletesAfter != null) {
+ List<Query> dbqList = new ArrayList<Query>(deletesAfter.size());
+ for (UpdateLog.DBQ dbq : deletesAfter) {
+ try {
+ DeleteUpdateCommand tmpDel = new DeleteUpdateCommand(cmd.req);
+ tmpDel.query = dbq.q;
+ tmpDel.version = -dbq.version;
+ dbqList.add( getQuery(tmpDel) );
+ } catch (Exception e) {
+ log.error("Exception parsing reordered query : " + dbq, e);
+ }
+ }
+
+ addAndDelete(cmd, dbqList);
+ } else {
+ // normal update
+
+ Term updateTerm;
+ Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
+ boolean del = false;
+ if (cmd.updateTerm == null) {
+ updateTerm = idTerm;
+ } else {
+ del = true;
+ updateTerm = cmd.updateTerm;
+ }
+
+ Document luceneDocument = cmd.getLuceneDocument();
+ // SolrCore.verbose("updateDocument",updateTerm,luceneDocument,writer);
+ writer.updateDocument(updateTerm, luceneDocument);
+ // SolrCore.verbose("updateDocument",updateTerm,"DONE");
+
+
+ if(del) { // ensure id remains unique
+ BooleanQuery bq = new BooleanQuery();
+ bq.add(new BooleanClause(new TermQuery(updateTerm), Occur.MUST_NOT));
+ bq.add(new BooleanClause(new TermQuery(idTerm), Occur.MUST));
+ writer.deleteDocuments(bq);
+ }
+
+
+ // Add to the transaction log *after* successfully adding to the index, if there was no error.
+ // This ordering ensures that if we log it, it's definitely been added to the the index.
+ // This also ensures that if a commit sneaks in-between, that we know everything in a particular
+ // log version was definitely committed.
+ if (ulog != null) ulog.add(cmd);
}
+
} else {
// allow duplicates
writer.addDocument(cmd.getLuceneDocument());
+ if (ulog != null) ulog.add(cmd);
}
- // Add to the transaction log *after* successfully adding to the index, if there was no error.
- // This ordering ensures that if we log it, it's definitely been added to the the index.
- // This also ensures that if a commit sneaks in-between, that we know everything in a particular
- // log version was definitely committed.
- if (ulog != null) ulog.add(cmd);
if ((cmd.getFlags() & UpdateCommand.IGNORE_AUTOCOMMIT) == 0) {
commitTracker.addedDocument( -1 );
@@ -243,6 +275,43 @@ public class DirectUpdateHandler2 extend
updateDeleteTrackers(cmd);
}
+
+ public void clearIndex() throws IOException {
+ deleteAll();
+ if (ulog != null) {
+ ulog.deleteAll();
+ }
+ }
+
+
+ private Query getQuery(DeleteUpdateCommand cmd) {
+ Query q;
+ try {
+ // move this higher in the stack?
+ QParser parser = QParser.getParser(cmd.getQuery(), "lucene", cmd.req);
+ q = parser.getQuery();
+ q = QueryUtils.makeQueryable(q);
+
+ // Make sure not to delete newer versions
+ if (ulog != null && cmd.getVersion() != 0 && cmd.getVersion() != -Long.MAX_VALUE) {
+ BooleanQuery bq = new BooleanQuery();
+ bq.add(q, Occur.MUST);
+ SchemaField sf = ulog.getVersionInfo().getVersionField();
+ ValueSource vs = sf.getType().getValueSource(sf, null);
+ ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.getVersion())), true, true);
+ FunctionRangeQuery range = new FunctionRangeQuery(filt);
+ bq.add(range, Occur.MUST);
+ q = bq;
+ }
+
+ return q;
+
+ } catch (ParseException e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ }
+
+
// we don't return the number of docs deleted because it's not always possible to quickly know that info.
@Override
public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
@@ -250,35 +319,19 @@ public class DirectUpdateHandler2 extend
deleteByQueryCommandsCumulative.incrementAndGet();
boolean madeIt=false;
try {
- Query q;
- try {
- // TODO: move this higher in the stack?
- QParser parser = QParser.getParser(cmd.query, "lucene", cmd.req);
- q = parser.getQuery();
- q = QueryUtils.makeQueryable(q);
-
- // peer-sync can cause older deleteByQueries to be executed and could
- // delete newer documents. We prevent this by adding a clause restricting
- // version.
- if ((cmd.getFlags() & UpdateCommand.PEER_SYNC) != 0) {
- BooleanQuery bq = new BooleanQuery();
- bq.add(q, Occur.MUST);
- SchemaField sf = core.getSchema().getField(VersionInfo.VERSION_FIELD);
- ValueSource vs = sf.getType().getValueSource(sf, null);
- ValueSourceRangeFilter filt = new ValueSourceRangeFilter(vs, null, Long.toString(Math.abs(cmd.version)), true, true);
- FunctionRangeQuery range = new FunctionRangeQuery(filt);
- bq.add(range, Occur.MUST);
- q = bq;
- }
-
-
-
- } catch (ParseException e) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
- }
+ Query q = getQuery(cmd);
boolean delAll = MatchAllDocsQuery.class == q.getClass();
+ // currently for testing purposes. Do a delete of complete index w/o worrying about versions, don't log, clean up most state in update log, etc
+ if (delAll && cmd.getVersion() == -Long.MAX_VALUE) {
+ synchronized (this) {
+ deleteAll();
+ ulog.deleteAll();
+ return;
+ }
+ }
+
//
// synchronized to prevent deleteByQuery from running during the "open new searcher"
// part of a commit. DBQ needs to signal that a fresh reader will be needed for
@@ -307,6 +360,31 @@ public class DirectUpdateHandler2 extend
}
}
+
+
+ /** Add a document execute the deletes as atomically as possible */
+ private void addAndDelete(AddUpdateCommand cmd, List<Query> dbqList) throws IOException {
+ Document luceneDocument = cmd.getLuceneDocument();
+ Term idTerm = new Term(idField.getName(), cmd.getIndexedId());
+
+ // see comment in deleteByQuery
+ synchronized (this) {
+ IndexWriter writer = solrCoreState.getIndexWriter(core);
+
+ writer.updateDocument(idTerm, luceneDocument);
+
+ for (Query q : dbqList) {
+ writer.deleteDocuments(q);
+ }
+
+ if (ulog != null) ulog.add(cmd, true);
+ }
+
+ }
+
+
+
+
@Override
public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
mergeIndexesCommands.incrementAndGet();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateCommand.java Tue Jul 3 15:05:30 2012
@@ -34,6 +34,7 @@ public abstract class UpdateCommand impl
public static int REPLAY = 0x00000002; // update command is from replaying a log.
public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer.
public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
+ public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log. used when applying reordered DBQ updates when doing an add.
public UpdateCommand(SolrQueryRequest req) {
this.req = req;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Tue Jul 3 15:05:30 2012
@@ -17,6 +17,7 @@
package org.apache.solr.update;
+import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -104,7 +105,9 @@ public class UpdateLog implements Plugin
private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
private final int numDeletesToKeep = 1000;
+ private final int numDeletesByQueryToKeep = 100;
public final int numRecordsToKeep = 100;
+
// keep track of deletes only... this is not updated on an add
private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
protected boolean removeEldestEntry(Map.Entry eldest) {
@@ -112,6 +115,18 @@ public class UpdateLog implements Plugin
}
};
+ public class DBQ {
+ public String q; // the query string
+ public long version; // positive version of the DBQ
+
+ @Override
+ public String toString() {
+ return "DBQ{version=" + version + ",q="+q+"}";
+ }
+ }
+
+ private LinkedList<DBQ> deleteByQueries = new LinkedList<DBQ>();
+
private String[] tlogFiles;
private File tlogDir;
private Collection<String> globalStrings;
@@ -207,6 +222,16 @@ public class UpdateLog implements Plugin
DeleteUpdate du = startingUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
}
+
+ // populate recent deleteByQuery commands
+ for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
+ Update update = startingUpdates.deleteByQueryList.get(i);
+ List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
+ long version = (Long) dbq.get(1);
+ String q = (String) dbq.get(2);
+ trackDeleteByQuery(q, version);
+ }
+
} finally {
startingUpdates.close();
}
@@ -280,6 +305,11 @@ public class UpdateLog implements Plugin
public void add(AddUpdateCommand cmd) {
+ add(cmd, false);
+ }
+
+
+ public void add(AddUpdateCommand cmd, boolean clearCaches) {
// don't log if we are replaying from another log
// TODO: we currently need to log to maintain correct versioning, rtg, etc
// if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
@@ -293,21 +323,43 @@ public class UpdateLog implements Plugin
pos = tlog.write(cmd, operationFlags);
}
- // TODO: in the future we could support a real position for a REPLAY update.
- // Only currently would be useful for RTG while in recovery mode though.
- LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+ if (!clearCaches) {
+ // TODO: in the future we could support a real position for a REPLAY update.
+ // Only currently would be useful for RTG while in recovery mode though.
+ LogPtr ptr = new LogPtr(pos, cmd.getVersion());
+
+ // only update our map if we're not buffering
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ map.put(cmd.getIndexedId(), ptr);
+ }
- // only update our map if we're not buffering
- if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
- map.put(cmd.getIndexedId(), ptr);
- }
+ if (trace) {
+ log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
+ }
+
+ } else {
+ // replicate the deleteByQuery logic. See deleteByQuery for comments.
+
+ map.clear();
+
+ try {
+ RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+ holder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+ }
+
+ if (trace) {
+ log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " clearCaches=true");
+ }
- if (trace) {
- log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
+
}
}
+
+
public void delete(DeleteUpdateCommand cmd) {
BytesRef br = cmd.getIndexedId();
@@ -350,6 +402,8 @@ public class UpdateLog implements Plugin
// affected and hence we must purge our caches.
map.clear();
+ trackDeleteByQuery(cmd.getQuery(), cmd.getVersion());
+
// oldDeletes.clear();
// We must cause a new IndexReader to be opened before anything looks at these caches again
@@ -373,6 +427,72 @@ public class UpdateLog implements Plugin
}
}
+ /** currently for testing only */
+ public void deleteAll() {
+ synchronized (this) {
+
+ try {
+ RefCounted<SolrIndexSearcher> holder = uhandler.core.openNewSearcher(true, true);
+ holder.decref();
+ } catch (Throwable e) {
+ SolrException.log(log, "Error opening realtime searcher for deleteByQuery", e);
+ }
+
+ map.clear();
+ oldDeletes.clear();
+ deleteByQueries.clear();
+ }
+ }
+
+
+ void trackDeleteByQuery(String q, long version) {
+ version = Math.abs(version);
+ DBQ dbq = new DBQ();
+ dbq.q = q;
+ dbq.version = version;
+
+ synchronized (this) {
+ if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) {
+ // common non-reordered case
+ deleteByQueries.addFirst(dbq);
+ } else {
+ // find correct insertion point
+ ListIterator<DBQ> iter = deleteByQueries.listIterator();
+ iter.next(); // we already checked the first element in the previous "if" clause
+ while (iter.hasNext()) {
+ DBQ oldDBQ = iter.next();
+ if (oldDBQ.version < version) {
+ iter.previous();
+ break;
+ } else if (oldDBQ.version == version && oldDBQ.q.equals(q)) {
+ // a duplicate
+ return;
+ }
+ }
+ iter.add(dbq); // this also handles the case of adding at the end when hasNext() == false
+ }
+
+ if (deleteByQueries.size() > numDeletesByQueryToKeep) {
+ deleteByQueries.removeLast();
+ }
+ }
+ }
+
+ public List<DBQ> getDBQNewer(long version) {
+ synchronized (this) {
+ if (deleteByQueries.isEmpty() || deleteByQueries.getFirst().version < version) {
+ // fast common case
+ return null;
+ }
+
+ List<DBQ> dbqList = new ArrayList<DBQ>();
+ for (DBQ dbq : deleteByQueries) {
+ if (dbq.version <= version) break;
+ dbqList.add(dbq);
+ }
+ return dbqList;
+ }
+ }
private void newMap() {
prevMap2 = prevMap;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Tue Jul 3 15:05:30 2012
@@ -443,6 +443,9 @@ public class DistributedUpdateProcessor
// realtime-get to work reliably.
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
+
+ boolean checkDeleteByQueries = false;
+
if (versionsStored) {
long bucketVersion = bucket.highest;
@@ -484,7 +487,7 @@ public class DistributedUpdateProcessor
// if we aren't the leader, then we need to check that updates were not re-ordered
if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
// we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occured.
+ // in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
} else {
// there have been updates higher than the current update. we need to check
@@ -494,11 +497,16 @@ public class DistributedUpdateProcessor
// This update is a repeat, or was reordered. We need to drop this update.
return true;
}
+
+ // also need to re-apply newer deleteByQuery commands
+ checkDeleteByQueries = true;
}
}
}
+ // TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
+
} // end synchronized (bucket)
} finally {
vinfo.unlockForUpdate();
@@ -654,7 +662,7 @@ public class DistributedUpdateProcessor
// FROM: we are a replica receiving a DBQ from our leader
// - log + execute the local DBQ
DistribPhase phase =
- DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+ DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
if (zkEnabled && DistribPhase.NONE == phase) {
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
@@ -772,8 +780,6 @@ public class DistributedUpdateProcessor
}
-
- // TODO: need to handle reorders to replicas somehow
// forward to all replicas
if (leaderLogic && replicas != null) {
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Tue Jul 3 15:05:30 2012
@@ -63,6 +63,18 @@ public class TestRealTimeGet extends Sol
initCore("solrconfig-tlog.xml","schema15.xml");
}
+ // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+ // since Solr can think following updates were reordered.
+ @Override
+ public void clearIndex() {
+ try {
+ deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
@Test
public void testGetRealtime() throws Exception {
clearIndex();
@@ -212,7 +224,7 @@ public class TestRealTimeGet extends Sol
assertU(commit());
// make sure a reordered add doesn't take affect.
- updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ long version2 = deleteByQueryAndGetVersion("id:2", null);
// test that it's still deleted
assertJQ(req("qt","/get","id","1")
@@ -220,9 +232,33 @@ public class TestRealTimeGet extends Sol
);
version = addAndGetVersion(sdoc("id","2"), null);
- long version2 = deleteByQueryAndGetVersion("id:2", null);
+ version2 = deleteByQueryAndGetVersion("id:2", null);
assertTrue(Math.abs(version2) > version );
-
+
+ // test that it's deleted
+ assertJQ(req("qt","/get","id","2")
+ ,"=={'doc':null}");
+
+
+ version2 = Math.abs(version2) + 1000;
+ updateJ(jsonAdd(sdoc("id","3", "_version_",Long.toString(version2+100))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ updateJ(jsonAdd(sdoc("id","4", "_version_",Long.toString(version2+200))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+ // this should only affect id:3 so far
+ deleteByQueryAndGetVersion("id:(3 4 5 6)", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(-(version2+150))) );
+
+ assertJQ(req("qt","/get","id","3"),"=={'doc':null}");
+ assertJQ(req("qt","/get","id","4", "fl","id"),"=={'doc':{'id':'4'}}");
+
+ updateJ(jsonAdd(sdoc("id","5", "_version_",Long.toString(version2+201))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ updateJ(jsonAdd(sdoc("id","6", "_version_",Long.toString(version2+101))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+ // the DBQ should also have caused id:6 to be removed
+ assertJQ(req("qt","/get","id","5", "fl","id"),"=={'doc':{'id':'5'}}");
+ assertJQ(req("qt","/get","id","6"),"=={'doc':null}");
+
+ assertU(commit());
+
}
@Test
@@ -425,7 +461,7 @@ public class TestRealTimeGet extends Sol
}
public String toString() {
- return "{version="+version+",val="+val+"\"";
+ return "{version="+version+",val="+val+"}";
}
}
@@ -978,7 +1014,7 @@ public class TestRealTimeGet extends Sol
final int commitPercent = 5 + random().nextInt(20);
final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = 0; // delete-by-query can't be reordered on replicas
+ final int deleteByQueryPercent = 1+random().nextInt(7);
final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
int nWriteThreads = 5 + random().nextInt(25);
@@ -989,6 +1025,24 @@ public class TestRealTimeGet extends Sol
final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
int nReadThreads = 5 + random().nextInt(25);
+
+ /** // testing
+ final int commitPercent = 5;
+ final int softCommitPercent = 100; // what percent of the commits are soft
+ final int deletePercent = 0;
+ final int deleteByQueryPercent = 50;
+ final int ndocs = 1;
+ int nWriteThreads = 2;
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 101;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 1;
+ **/
+
+
initModel(ndocs);
final AtomicInteger numCommitting = new AtomicInteger();
@@ -1094,6 +1148,26 @@ public class TestRealTimeGet extends Sol
verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
+
+ Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
+ }
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
+
} else {
verbose("adding id", id, "val=", nextVal,"version",version);
@@ -1123,6 +1197,7 @@ public class TestRealTimeGet extends Sol
}
} catch (Throwable e) {
operations.set(-1L);
+ log.error("",e);
throw new RuntimeException(e);
}
}
@@ -1185,6 +1260,7 @@ public class TestRealTimeGet extends Sol
}
} catch (Throwable e) {
operations.set(-1L);
+ log.error("",e);
throw new RuntimeException(e);
}
}
@@ -1225,7 +1301,7 @@ public class TestRealTimeGet extends Sol
final int commitPercent = 5 + random().nextInt(10);
final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random().nextInt(25);
- final int deleteByQueryPercent = 0; // real-time get isn't currently supported with delete-by-query
+ final int deleteByQueryPercent = random().nextInt(5); // real-time get isn't currently supported with delete-by-query
final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
int nWriteThreads = 2 + random().nextInt(10); // fewer write threads to give recovery thread more of a chance
@@ -1364,6 +1440,26 @@ public class TestRealTimeGet extends Sol
verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
} else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ verbose("deleteByQuery id",id,"val=",nextVal,"version",version);
+
+ Long returnedVersion = deleteByQueryAndGetVersion("id:"+Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+
+ // TODO: returning versions for these types of updates is redundant
+ // but if we do return, they had better be equal
+ if (returnedVersion != null) {
+ assertEquals(-version, returnedVersion.longValue());
+ }
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleteByQuery id", id, "val=",nextVal,"version",version,"DONE");
+
} else {
verbose("adding id", id, "val=", nextVal,"version",version);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java?rev=1356778&r1=1356777&r2=1356778&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRecovery.java Tue Jul 3 15:05:30 2012
@@ -69,6 +69,19 @@ public class TestRecovery extends SolrTe
}
}
+
+ // since we make up fake versions in these tests, we can get messed up by a DBQ with a real version
+ // since Solr can think following updates were reordered.
+ @Override
+ public void clearIndex() {
+ try {
+ deleteByQueryAndGetVersion("*:*", params("_version_", Long.toString(-Long.MAX_VALUE), DISTRIB_UPDATE_PARAM,FROM_LEADER));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
@Test
public void testLogReplay() throws Exception {
try {