You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/04 16:36:17 UTC
svn commit: r1197600 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/search/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/ example/solr/conf/
Author: yonik
Date: Fri Nov 4 15:36:16 2011
New Revision: 1197600
URL: http://svn.apache.org/viewvc?rev=1197600&view=rev
Log:
SOLR-2816: keep track of deletes, lookup versions from index, handle out of order updates
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml
lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Fri Nov 4 15:36:16 2011
@@ -496,6 +496,36 @@ public class SolrIndexSearcher extends I
return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
}
+ /** lookup the docid by the unique key field, and return the id *within* the leaf reader in the low 32 bits, and the index of the leaf reader in the high 32 bits.
+ * -1 is returned if not found.
+ * @lucene.internal
+ */
+ public long lookupId(BytesRef idBytes) throws IOException {
+ String field = schema.getUniqueKeyField().getName();
+ final AtomicReaderContext[] leaves = leafContexts;
+
+
+ for (int i=0; i<leaves.length; i++) {
+ final AtomicReaderContext leaf = leaves[i];
+ final IndexReader reader = leaf.reader;
+
+ final Fields fields = reader.fields();
+ if (fields == null) continue;
+ final Terms terms = fields.terms(field);
+ if (terms == null) continue;
+ final Bits liveDocs = reader.getLiveDocs();
+ final DocsEnum docs = terms.docs(liveDocs, idBytes, null);
+ if (docs == null) continue;
+ int id = docs.nextDoc();
+ if (id == DocIdSetIterator.NO_MORE_DOCS) continue;
+ assert docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS;
+
+ return (((long)i) << 32) | id;
+ }
+
+ return -1;
+ }
+
/**
* Compute and cache the DocSet that matches a query.
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Fri Nov 4 15:36:16 2011
@@ -17,7 +17,6 @@
package org.apache.solr.update;
-import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -27,7 +26,6 @@ import org.apache.solr.common.util.JavaB
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.schema.SchemaField;
import java.io.*;
import java.nio.ByteBuffer;
@@ -35,7 +33,6 @@ import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
/** @lucene.experimental */
class NullUpdateLog extends UpdateLog {
@@ -111,6 +108,15 @@ public class FSUpdateLog extends UpdateL
private TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
+ private final int numDeletesToKeep = 1000;
+ // keep track of deletes only... this is not updated on an add
+ private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > numDeletesToKeep;
+ }
+ };
+
+
private String[] tlogFiles;
private File tlogDir;
private Collection<String> globalStrings;
@@ -144,7 +150,7 @@ public class FSUpdateLog extends UpdateL
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
- versionInfo = new VersionInfo(uhandler, 64);
+ versionInfo = new VersionInfo(uhandler, 256);
}
static class LogPtr {
@@ -201,6 +207,8 @@ public class FSUpdateLog extends UpdateL
long pos = tlog.writeDelete(cmd);
LogPtr ptr = new LogPtr(pos, cmd.version);
map.put(br, ptr);
+
+ oldDeletes.put(br, ptr);
// SolrCore.verbose("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
}
}
@@ -300,18 +308,18 @@ public class FSUpdateLog extends UpdateL
synchronized (this) {
entry = map.get(indexedId);
lookupLog = tlog; // something found in "map" will always be in "tlog"
- // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog);
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
if (entry == null && prevMap != null) {
entry = prevMap.get(indexedId);
// something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
lookupLog = prevMapLog;
- // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
if (entry == null && prevMap2 != null) {
entry = prevMap2.get(indexedId);
// something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
lookupLog = prevMapLog2;
- // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+ // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
if (entry == null) {
@@ -329,6 +337,10 @@ public class FSUpdateLog extends UpdateL
}
+ // This method works like realtime-get... it only guarantees to return the latest
+ // version of the *completed* update. There can be updates in progress concurrently
+ // that have already grabbed higher version numbers. Higher level coordination or
+ // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
@Override
public Long lookupVersion(BytesRef indexedId) {
LogPtr entry;
@@ -337,26 +349,44 @@ public class FSUpdateLog extends UpdateL
synchronized (this) {
entry = map.get(indexedId);
lookupLog = tlog; // something found in "map" will always be in "tlog"
- // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " + System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog);
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
if (entry == null && prevMap != null) {
entry = prevMap.get(indexedId);
// something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
lookupLog = prevMapLog;
- // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
if (entry == null && prevMap2 != null) {
entry = prevMap2.get(indexedId);
// something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
lookupLog = prevMapLog2;
- // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " + System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
}
- if (entry == null) {
- return null;
+ if (entry != null) {
+ return entry.version;
+ }
+
+ // Now check real index
+ Long version = versionInfo.getVersionFromIndex(indexedId);
+
+ if (version != null) {
+ return version;
}
- return entry.version;
+ // We can't get any version info for deletes from the index, so if the doc
+ // wasn't found, check a cache of recent deletes.
+
+ synchronized (this) {
+ entry = oldDeletes.get(indexedId);
+ }
+
+ if (entry != null) {
+ return entry.version;
+ }
+
+ return null;
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java Fri Nov 4 15:36:16 2011
@@ -21,6 +21,13 @@ package org.apache.solr.update;
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
// TODO: could also store highest possible in the transaction log after a commit.
// Or on a new index, just scan "version" for the max?
+/** @lucene.internal */
public class VersionBucket {
- long highest;
+ public long highest;
+
+ public void updateHighest(long val) {
+ if (highest != 0) {
+ highest = Math.max(highest, Math.abs(val));
+ }
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Fri Nov 4 15:36:16 2011
@@ -17,19 +17,39 @@
package org.apache.solr.update;
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.queries.function.DocValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
+import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class VersionInfo {
public static final String VERSION_FIELD="_version_";
+ private SolrCore core;
+ private UpdateHandler updateHandler;
private final VersionBucket[] buckets;
private SchemaField versionField;
+ private SchemaField idField;
public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
- versionField = updateHandler.core.getSchema().getFieldOrNull("_version_");
+ this.updateHandler = updateHandler;
+ this.core = updateHandler.core;
+ versionField = core.getSchema().getFieldOrNull("_version_");
+ idField = core.getSchema().getUniqueKeyField();
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
for (int i=0; i<buckets.length; i++) {
buckets[i] = new VersionBucket();
@@ -104,4 +124,34 @@ public class VersionInfo {
return buckets[slot];
}
+ public Long lookupVersion(BytesRef idBytes) {
+ return updateHandler.ulog.lookupVersion(idBytes);
+ }
+
+ public Long getVersionFromIndex(BytesRef idBytes) {
+ // TODO: we could cache much of this and invalidate during a commit.
+ // TODO: most DocValues classes are threadsafe - expose which.
+
+ RefCounted<SolrIndexSearcher> newestSearcher = core.getNewestSearcher(true);
+ try {
+ SolrIndexSearcher searcher = newestSearcher.get();
+ long lookup = searcher.lookupId(idBytes);
+ if (lookup < 0) return null;
+
+ ValueSource vs = versionField.getType().getValueSource(versionField, null);
+ Map context = ValueSource.newContext(searcher);
+ vs.createWeight(context, searcher);
+ DocValues dv = vs.getValues(context, searcher.getTopReaderContext().leaves()[(int)(lookup>>32)]);
+ long ver = dv.longVal((int)lookup);
+ return ver;
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+ } finally {
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+ }
+
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java Fri Nov 4 15:36:16 2011
@@ -18,16 +18,27 @@
package org.apache.solr.update.processor;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.DocValues;
+import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.*;
+import org.apache.solr.util.RefCounted;
import org.omg.PortableInterceptor.RequestInfo;
@@ -95,6 +106,29 @@ class VersionProcessor extends UpdateReq
return;
}
+ boolean leaderForUpdate = true; // TODO: figure out if I'm the leader
+ boolean needToForward = false; // TODO: figure out if I need to forward this to the leader
+
+ if (needToForward) {
+ // TODO: forward update to the leader
+ return;
+ }
+
+ // at this point, there is an update we need to try and apply.
+ // we may or may not be the leader.
+
+ // Find any existing version in the document
+ long versionOnUpdate = 0;
+ SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD);
+ if (versionField != null) {
+ Object o = versionField.getValue();
+ versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+ } else {
+ // TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
+ }
+
+
+
VersionBucket bucket = vinfo.bucket(hash(cmd));
synchronized (bucket) {
// we obtain the version when synchronized and then do the add so we can ensure that
@@ -106,9 +140,31 @@ class VersionProcessor extends UpdateReq
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
if (versionsStored) {
- long version = vinfo.getNewClock();
- cmd.setVersion(version);
- cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+ long bucketVersion = bucket.highest;
+
+ if (leaderForUpdate) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(version);
+ cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+ bucket.updateHighest(version);
+ } else {
+ // The leader forwarded us this update.
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occured.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ // TODO: do we need to add anything to the response?
+ return;
+ }
+ }
+ }
}
super.processAdd(cmd);
@@ -144,12 +200,50 @@ class VersionProcessor extends UpdateReq
return;
}
+ boolean leaderForUpdate = true; // TODO: figure out if I'm the leader
+ boolean needToForward = false; // TODO: figure out if I need to forward this to the leader
+
+ if (needToForward) {
+ // TODO: forward update to the leader
+ return;
+ }
+
+ // at this point, there is an update we need to try and apply.
+ // we may or may not be the leader.
+
+ // Find the version
+ long versionOnUpdate = 0;
+ // TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
+
VersionBucket bucket = vinfo.bucket(hash(cmd));
synchronized (bucket) {
if (versionsStored) {
- long version = -vinfo.getNewClock(); // deletes have negative version
- cmd.setVersion(version);
+ long bucketVersion = bucket.highest;
+
+ if (leaderForUpdate) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(-version);
+ bucket.updateHighest(version);
+ } else {
+ // The leader forwarded us this update.
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occured.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ // TODO: do we need to add anything to the response?
+ return;
+ }
+ }
+ }
}
+
super.processDelete(cmd);
}
@@ -183,6 +277,8 @@ class VersionProcessor extends UpdateReq
{
super.processRollback(cmd);
}
+
+
}
Modified: lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml (original)
+++ lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml Fri Nov 4 15:36:16 2011
@@ -527,6 +527,9 @@
<field name="payloads" type="payloads" indexed="true" stored="true"/>
+
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+
<!-- Uncommenting the following will create a "timestamp" field using
a default value of "NOW" to indicate when each document was indexed.
-->
Modified: lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml (original)
+++ lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml Fri Nov 4 15:36:16 2011
@@ -354,11 +354,9 @@
<!-- Enables a transaction log, currently used for real-time get.
"dir" - the target directory for transaction logs, defaults to the
solr data directory. -->
- <!--
<updateLog class="solr.FSUpdateLog">
<str name="dir">${solr.data.dir:}</str>
</updateLog>
- -->
</updateHandler>