You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/11/04 16:36:17 UTC

svn commit: r1197600 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/search/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ example/solr/conf/

Author: yonik
Date: Fri Nov  4 15:36:16 2011
New Revision: 1197600

URL: http://svn.apache.org/viewvc?rev=1197600&view=rev
Log:
SOLR-2816: keep track of deletes, lookup versions from index, handle out of order updates

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
    lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml
    lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Fri Nov  4 15:36:16 2011
@@ -496,6 +496,36 @@ public class SolrIndexSearcher extends I
     return id == DocIdSetIterator.NO_MORE_DOCS ? -1 : id;
   }
 
+  /** lookup the docid by the unique key field, and return the id *within* the leaf reader in the low 32 bits, and the index of the leaf reader in the high 32 bits.
+   * -1 is returned if not found.
+   * @lucene.internal
+   */
+  public long lookupId(BytesRef idBytes) throws IOException {
+    String field = schema.getUniqueKeyField().getName();
+    final AtomicReaderContext[] leaves = leafContexts;
+
+
+    for (int i=0; i<leaves.length; i++) {
+      final AtomicReaderContext leaf = leaves[i];
+      final IndexReader reader = leaf.reader;
+
+      final Fields fields = reader.fields();
+      if (fields == null) continue;
+      final Terms terms = fields.terms(field);
+      if (terms == null) continue;
+      final Bits liveDocs = reader.getLiveDocs();
+      final DocsEnum docs = terms.docs(liveDocs, idBytes, null);
+      if (docs == null) continue;
+      int id = docs.nextDoc();
+      if (id == DocIdSetIterator.NO_MORE_DOCS) continue;
+      assert docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS;
+
+      return (((long)i) << 32) | id;
+    }
+
+    return -1;
+  }
+
 
   /**
    * Compute and cache the DocSet that matches a query.

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/FSUpdateLog.java Fri Nov  4 15:36:16 2011
@@ -17,7 +17,6 @@
 
 package org.apache.solr.update;
 
-import org.apache.lucene.util.BitUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -27,7 +26,6 @@ import org.apache.solr.common.util.JavaB
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.schema.SchemaField;
 
 import java.io.*;
 import java.nio.ByteBuffer;
@@ -35,7 +33,6 @@ import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 /** @lucene.experimental */
 class NullUpdateLog extends UpdateLog {
@@ -111,6 +108,15 @@ public class FSUpdateLog extends UpdateL
   private TransactionLog prevMapLog;  // the transaction log used to look up entries found in prevMap
   private TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap
 
+  private final int numDeletesToKeep = 1000;
+  // keep track of deletes only... this is not updated on an add
+  private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+      return size() > numDeletesToKeep;
+    }
+  };
+
+
   private String[] tlogFiles;
   private File tlogDir;
   private Collection<String> globalStrings;
@@ -144,7 +150,7 @@ public class FSUpdateLog extends UpdateL
     tlogFiles = getLogList(tlogDir);
     id = getLastLogId() + 1;   // add 1 since we will create a new log for the next update
 
-    versionInfo = new VersionInfo(uhandler, 64);
+    versionInfo = new VersionInfo(uhandler, 256);
   }
 
   static class LogPtr {
@@ -201,6 +207,8 @@ public class FSUpdateLog extends UpdateL
       long pos = tlog.writeDelete(cmd);
       LogPtr ptr = new LogPtr(pos, cmd.version);
       map.put(br, ptr);
+
+      oldDeletes.put(br, ptr);
       // SolrCore.verbose("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
     }
   }
@@ -300,18 +308,18 @@ public class FSUpdateLog extends UpdateL
     synchronized (this) {
       entry = map.get(indexedId);
       lookupLog = tlog;  // something found in "map" will always be in "tlog"
-      // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " +  System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog);
+      // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       if (entry == null && prevMap != null) {
         entry = prevMap.get(indexedId);
         // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
         lookupLog = prevMapLog;
-        // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " +  System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+        // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
       if (entry == null && prevMap2 != null) {
         entry = prevMap2.get(indexedId);
         // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
         lookupLog = prevMapLog2;
-        // SolrCore.verbose("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " +  System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+        // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
 
       if (entry == null) {
@@ -329,6 +337,10 @@ public class FSUpdateLog extends UpdateL
 
   }
 
+  // This method works like realtime-get... it only guarantees to return the latest
+  // version of the *completed* update.  There can be updates in progress concurrently
+  // that have already grabbed higher version numbers.  Higher level coordination or
+  // synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
   @Override
   public Long lookupVersion(BytesRef indexedId) {
     LogPtr entry;
@@ -337,26 +349,44 @@ public class FSUpdateLog extends UpdateL
     synchronized (this) {
       entry = map.get(indexedId);
       lookupLog = tlog;  // something found in "map" will always be in "tlog"
-      // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in map " +  System.identityHashCode(map) + " got " + entry + " lookupLog=" + lookupLog);
+      // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       if (entry == null && prevMap != null) {
         entry = prevMap.get(indexedId);
         // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
         lookupLog = prevMapLog;
-        // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap " +  System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
       if (entry == null && prevMap2 != null) {
         entry = prevMap2.get(indexedId);
         // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
         lookupLog = prevMapLog2;
-        // System.out.println("TLOG: lookup: for id " + indexedId.utf8ToString() + " in prevMap2 " +  System.identityHashCode(prevMap) + " got " + entry + " lookupLog="+lookupLog);
+        // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
     }
 
-    if (entry == null) {
-      return null;
+    if (entry != null) {
+      return entry.version;
+    }
+
+    // Now check real index
+    Long version = versionInfo.getVersionFromIndex(indexedId);
+
+    if (version != null) {
+      return version;
     }
 
-    return entry.version;
+    // We can't get any version info for deletes from the index, so if the doc
+    // wasn't found, check a cache of recent deletes.
+
+    synchronized (this) {
+      entry = oldDeletes.get(indexedId);
+    }
+
+    if (entry != null) {
+      return entry.version;
+    }
+
+    return null;
   }
 
 

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionBucket.java Fri Nov  4 15:36:16 2011
@@ -21,6 +21,13 @@ package org.apache.solr.update;
 // TODO: store the highest possible in the index on a commit (but how to not block adds?)
 // TODO: could also store highest possible in the transaction log after a commit.
 // Or on a new index, just scan "version" for the max?
+/** @lucene.internal */
 public class VersionBucket {
-  long highest;
+  public long highest;
+
+  public void updateHighest(long val) {
+    if (highest != 0) {
+      highest = Math.max(highest, Math.abs(val));
+    }
+  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/VersionInfo.java Fri Nov  4 15:36:16 2011
@@ -17,19 +17,39 @@
 
 package org.apache.solr.update;
 
+import org.apache.lucene.index.DocsEnum;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiFields;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.queries.function.DocValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RefCounted;
 
+import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class VersionInfo {
   public static final String VERSION_FIELD="_version_";
 
+  private SolrCore core;
+  private UpdateHandler updateHandler;
   private final VersionBucket[] buckets;
   private SchemaField versionField;
+  private SchemaField idField;
 
   public VersionInfo(UpdateHandler updateHandler, int nBuckets) {
-    versionField = updateHandler.core.getSchema().getFieldOrNull("_version_");
+    this.updateHandler = updateHandler;
+    this.core = updateHandler.core;
+    versionField = core.getSchema().getFieldOrNull("_version_");
+    idField = core.getSchema().getUniqueKeyField();
     buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
     for (int i=0; i<buckets.length; i++) {
       buckets[i] = new VersionBucket();
@@ -104,4 +124,34 @@ public class VersionInfo {
     return buckets[slot];
   }
 
+  public Long lookupVersion(BytesRef idBytes) {
+    return updateHandler.ulog.lookupVersion(idBytes);
+  }
+
+  public Long getVersionFromIndex(BytesRef idBytes) {
+    // TODO: we could cache much of this and invalidate during a commit.
+    // TODO: most DocValues classes are threadsafe - expose which.
+
+    RefCounted<SolrIndexSearcher> newestSearcher = core.getNewestSearcher(true);
+    try {
+      SolrIndexSearcher searcher = newestSearcher.get();
+      long lookup = searcher.lookupId(idBytes);
+      if (lookup < 0) return null;
+
+      ValueSource vs = versionField.getType().getValueSource(versionField, null);
+      Map context = ValueSource.newContext(searcher);
+      vs.createWeight(context, searcher);
+      DocValues dv = vs.getValues(context, searcher.getTopReaderContext().leaves()[(int)(lookup>>32)]);
+      long ver = dv.longVal((int)lookup);
+      return ver;
+
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+    } finally {
+      if (newestSearcher != null) {
+        newestSearcher.decref();
+      }
+    }
+  }
+
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/VersionProcessorFactory.java Fri Nov  4 15:36:16 2011
@@ -18,16 +18,27 @@
 package org.apache.solr.update.processor;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queries.function.DocValues;
+import org.apache.lucene.queries.function.ValueSource;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.*;
+import org.apache.solr.util.RefCounted;
 import org.omg.PortableInterceptor.RequestInfo;
 
 
@@ -95,6 +106,29 @@ class VersionProcessor extends UpdateReq
       return;
     }
 
+    boolean leaderForUpdate = true; // TODO: figure out if I'm the leader
+    boolean needToForward = false;  // TODO: figure out if I need to forward this to the leader
+
+    if (needToForward) {
+      // TODO: forward update to the leader
+      return;
+    }
+
+    // at this point, there is an update we need to try and apply.
+    // we may or may not be the leader.
+
+    // Find any existing version in the document
+    long versionOnUpdate = 0;
+    SolrInputField versionField = cmd.getSolrInputDocument().getField(VersionInfo.VERSION_FIELD);
+    if (versionField != null) {
+      Object o = versionField.getValue();
+      versionOnUpdate = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+    } else {
+      // TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
+    }
+
+
+
     VersionBucket bucket = vinfo.bucket(hash(cmd));
     synchronized (bucket) {
       // we obtain the version when synchronized and then do the add so we can ensure that
@@ -106,9 +140,31 @@ class VersionProcessor extends UpdateReq
       // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
       // there may be other reasons in the future for a version on the commands
       if (versionsStored) {
-        long version = vinfo.getNewClock();
-        cmd.setVersion(version);
-        cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+        long bucketVersion = bucket.highest;
+
+        if (leaderForUpdate) {
+          long version = vinfo.getNewClock();
+          cmd.setVersion(version);
+          cmd.getSolrInputDocument().setField(VersionInfo.VERSION_FIELD, version);
+          bucket.updateHighest(version);
+        } else {
+          // The leader forwarded us this update.
+          // if we aren't the leader, then we need to check that updates were not re-ordered
+          if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+            // we're OK... this update has a version higher than anything we've seen
+            // in this bucket so far, so we know that no reordering has yet occured.
+            bucket.updateHighest(versionOnUpdate);
+          } else {
+            // there have been updates higher than the current update.  we need to check
+            // the specific version for this id.
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+              // This update is a repeat, or was reordered.  We need to drop this update.
+              // TODO: do we need to add anything to the response?
+              return;
+            }
+          }
+        }
       }
 
       super.processAdd(cmd);
@@ -144,12 +200,50 @@ class VersionProcessor extends UpdateReq
       return;
     }
 
+    boolean leaderForUpdate = true; // TODO: figure out if I'm the leader
+    boolean needToForward = false;  // TODO: figure out if I need to forward this to the leader
+
+    if (needToForward) {
+      // TODO: forward update to the leader
+      return;
+    }
+
+    // at this point, there is an update we need to try and apply.
+    // we may or may not be the leader.
+
+    // Find the version
+    long versionOnUpdate = 0;
+   // TODO: check for the version in the request params (this will be for user provided versions and optimistic concurrency only)
+
     VersionBucket bucket = vinfo.bucket(hash(cmd));
     synchronized (bucket) {
       if (versionsStored) {
-        long version =  -vinfo.getNewClock();    // deletes have negative version
-        cmd.setVersion(version);
+        long bucketVersion = bucket.highest;
+
+        if (leaderForUpdate) {
+          long version = vinfo.getNewClock();
+          cmd.setVersion(-version);
+          bucket.updateHighest(version);
+        } else {
+          // The leader forwarded us this update.
+          // if we aren't the leader, then we need to check that updates were not re-ordered
+          if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+            // we're OK... this update has a version higher than anything we've seen
+            // in this bucket so far, so we know that no reordering has yet occured.
+            bucket.updateHighest(versionOnUpdate);
+          } else {
+            // there have been updates higher than the current update.  we need to check
+            // the specific version for this id.
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+              // This update is a repeat, or was reordered.  We need to drop this update.
+              // TODO: do we need to add anything to the response?
+              return;
+            }
+          }
+        }
       }
+
       super.processDelete(cmd);
     }
 
@@ -183,6 +277,8 @@ class VersionProcessor extends UpdateReq
   {
     super.processRollback(cmd);
   }
+
+
 }
 
 

Modified: lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml (original)
+++ lucene/dev/branches/solrcloud/solr/example/solr/conf/schema.xml Fri Nov  4 15:36:16 2011
@@ -527,6 +527,9 @@
 
    <field name="payloads" type="payloads" indexed="true" stored="true"/>
 
+
+   <field name="_version_" type="long" indexed="true" stored="true"/>
+
    <!-- Uncommenting the following will create a "timestamp" field using
         a default value of "NOW" to indicate when each document was indexed.
      -->

Modified: lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml?rev=1197600&r1=1197599&r2=1197600&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml (original)
+++ lucene/dev/branches/solrcloud/solr/example/solr/conf/solrconfig.xml Fri Nov  4 15:36:16 2011
@@ -354,11 +354,9 @@
     <!-- Enables a transaction log, currently used for real-time get.
          "dir" - the target directory for transaction logs, defaults to the
             solr data directory.  --> 
-    <!--
     <updateLog class="solr.FSUpdateLog">
       <str name="dir">${solr.data.dir:}</str>
     </updateLog>
-    -->
    
 
   </updateHandler>