You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2013/10/31 20:13:36 UTC

svn commit: r1537587 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/component/ core/src/java/org/apache/solr/update/processor/ core/src/test-files/solr/collection1/conf/ core/src/test/org/apache/solr/cloud/ core/src/test/org/apac...

Author: yonik
Date: Thu Oct 31 19:13:35 2013
New Revision: 1537587

URL: http://svn.apache.org/r1537587
Log:
SOLR-5374: user version update processor

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java   (with props)
    lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java   (with props)
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Oct 31 19:13:35 2013
@@ -115,6 +115,11 @@ New Features
  * SOLR-5406: CloudSolrServer failed to propagate request parameters
    along with delete updates. (yonik)
 
+ * SOLR-5374: Support user configured doc-centric versioning rules
+   via the optional DocBasedVersionConstraintsProcessorFactory
+   update processor (Hossman, yonik)
+
+
 
 Bug Fixes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Thu Oct 31 19:13:35 2013
@@ -203,31 +203,46 @@ public class RealTimeGetComponent extend
 
   }
 
+
+  public static SolrInputDocument DELETED = new SolrInputDocument();
+
+  /** returns the SolrInputDocument from the current tlog, or DELETED if it has been deleted, or
+   * null if there is no record of it in the current update log.  If null is returned, it could
+   * still be in the latest index.
+   */
+  public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes) {
+
+    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+
+    if (ulog != null) {
+      Object o = ulog.lookup(idBytes);
+      if (o != null) {
+        // should currently be a List<Oper,Ver,Doc/Id>
+        List entry = (List)o;
+        assert entry.size() >= 3;
+        int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
+        switch (oper) {
+          case UpdateLog.ADD:
+            return (SolrInputDocument)entry.get(entry.size()-1);
+          case UpdateLog.DELETE:
+            return DELETED;
+          default:
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+        }
+      }
+    }
+
+    return null;
+  }
+
   public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
     SolrInputDocument sid = null;
     RefCounted<SolrIndexSearcher> searcherHolder = null;
     try {
       SolrIndexSearcher searcher = null;
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-
-
-      if (ulog != null) {
-        Object o = ulog.lookup(idBytes);
-        if (o != null) {
-          // should currently be a List<Oper,Ver,Doc/Id>
-          List entry = (List)o;
-          assert entry.size() >= 3;
-          int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
-          switch (oper) {
-            case UpdateLog.ADD:
-              sid = (SolrInputDocument)entry.get(entry.size()-1);
-              break;
-            case UpdateLog.DELETE:
-              return null;
-            default:
-              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
-          }
-        }
+      sid = getInputDocumentFromTlog(core, idBytes);
+      if (sid == DELETED) {
+        return null;
       }
 
       if (sid == null) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Oct 31 19:13:35 2013
@@ -1111,6 +1111,25 @@ public class DistributedUpdateProcessor 
     }
   }
 
+  // internal helper method to tell if we are the leader for an add or deleteById update
+  boolean isLeader(UpdateCommand cmd) {
+    updateCommand = cmd;
+
+    if (zkEnabled) {
+      zkCheck();
+      if (cmd instanceof AddUpdateCommand) {
+        AddUpdateCommand acmd = (AddUpdateCommand)cmd;
+        nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
+      } else if (cmd instanceof DeleteUpdateCommand) {
+        DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
+        nodes = setupRequest(dcmd.getId(), null);
+      }
+    } else {
+      isLeader = getNonZkLeaderAssumption(req);
+    }
+
+    return isLeader;
+  }
 
   private void zkCheck() {
     if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * <p>
+ * This Factory generates an UpdateProcessor that helps to enforce Version 
+ * constraints on documents based on per-document version numbers using a configured 
+ * name of a <code>versionField</code>.  It should be configured on the "default"
+ * update processor somewhere before the DistributedUpdateProcessorFactory.
+ * As an example, see the solrconfig.xml that the tests use:
+ * solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
+ * </p>
+ * <p>
+ * When documents are added through this processor, if a document with the same 
+ * unique key already exists in the collection, then the value of the 
+ * <code>versionField</code> in the <i>existing</i> document is not less then the 
+ * field value in the <i>new</i> document then the new document is rejected with a 
+ * 409 Version Conflict error.
+ * </p>
+ * <p>
+ * In addition to the mandatory <code>versionField</code> init param, two additional 
+ * optional init params affect the behavior of this factory:
+ * </p>
+ * <ul>
+ *   <li><code>deleteVersionParam</code> - This string parameter controls whether this 
+ *     processor will intercept and inspect Delete By Id commands in addition to adding 
+ *     documents.  If specified, then the value will specify the name of a request 
+ *     paramater which becomes  mandatory for all Delete By Id commands.  This param 
+ *     must then be used to specify the document version associated with the delete.
+ *     If the version specified using this param is not greater then the value in the 
+ *     <code>versionField</code> for any existing document, then the delete will fail 
+ *     with a 409 Version Conflict error.  When using this param, Any Delete By Id 
+ *     command with a high enough document version number to succeed will be internally 
+ *     converted into an Add Document command that replaces the existing document with 
+ *     a new one which is empty except for the Unique Key and <code>versionField</code> 
+ *     to keeping a record of the deleted version so future Add Document commands will 
+ *     fail if their "new" version is not high enough.</li>
+ *
+ *   <li><code>ignoreOldUpdates</code> - This boolean parameter defaults to 
+ *     <code>false</code>, but if set to <code>true</code> causes any update with a 
+ *     document version that is not great enough to be silently ignored (and return 
+ *     a status 200 to the client) instead of generating a 409 Version Conflict error.
+ *   </li>
+ * </ul>
+ */
+public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+  public final static Logger log = LoggerFactory.getLogger(DocBasedVersionConstraintsProcessorFactory.class);
+
+  private boolean ignoreOldUpdates = false;
+  private String versionField = null;
+  private String deleteVersionParamName = null;
+  private boolean useFieldCache;
+
+  @Override
+  public void init( NamedList args )  {
+
+    Object tmp = args.remove("versionField");
+    if (null == tmp) {
+      throw new SolrException(SERVER_ERROR, 
+                              "'versionField' must be configured");
+    }
+    if (! (tmp instanceof String) ) {
+      throw new SolrException(SERVER_ERROR, 
+                              "'versionField' must be configured as a <str>");
+    }
+    versionField = tmp.toString();
+
+    // optional
+    tmp = args.remove("deleteVersionParam");
+    if (null != tmp) {
+      if (! (tmp instanceof String) ) {
+        throw new SolrException(SERVER_ERROR, 
+                                "'deleteVersionParam' must be configured as a <str>");
+      }
+      deleteVersionParamName = tmp.toString();
+    }
+
+    // optional - defaults to false
+    tmp = args.remove("ignoreOldUpdates");
+    if (null != tmp) {
+      if (! (tmp instanceof Boolean) ) {
+        throw new SolrException(SERVER_ERROR, 
+                                "'ignoreOldUpdates' must be configured as a <bool>");
+      }
+      ignoreOldUpdates = ((Boolean)tmp).booleanValue();
+    }
+    super.init(args);
+  }
+  
+
+  public UpdateRequestProcessor getInstance(SolrQueryRequest req, 
+                                            SolrQueryResponse rsp, 
+                                            UpdateRequestProcessor next ) {
+    return new DocBasedVersionConstraintsProcessor(versionField, 
+                                                   ignoreOldUpdates,
+                                                   deleteVersionParamName,
+                                                   useFieldCache,
+                                                   req, rsp, next);
+  }
+
+  @Override
+  public void inform(SolrCore core) {
+
+    if (core.getUpdateHandler().getUpdateLog() == null) {
+      throw new SolrException(SERVER_ERROR,
+          "updateLog must be enabled.");
+    }
+
+    if (core.getLatestSchema().getUniqueKeyField() == null) {
+      throw new SolrException(SERVER_ERROR,
+          "schema must have uniqueKey defined.");
+    }
+
+    SchemaField userVersionField = core.getLatestSchema().getField(versionField);
+    if (userVersionField == null || !userVersionField.stored() || userVersionField.multiValued()) {
+      throw new SolrException(SERVER_ERROR,
+          "field " + versionField + " must be defined in schema, be stored, and be single valued.");
+    }
+
+    try {
+      ValueSource vs = userVersionField.getType().getValueSource(userVersionField, null);
+      useFieldCache = true;
+    } catch (Exception e) {
+      log.warn("Can't use fieldcache/valuesource: " + e.getMessage());
+    }
+  }
+
+
+
+  private static class DocBasedVersionConstraintsProcessor
+    extends UpdateRequestProcessor {
+
+    private final String versionFieldName;
+    private final SchemaField userVersionField;
+    private final SchemaField solrVersionField;
+    private final boolean ignoreOldUpdates;
+    private final String deleteVersionParamName;
+    private final SolrCore core;
+
+    private long oldSolrVersion;  // current _version_ of the doc in the index/update log
+    private DistributedUpdateProcessor distribProc;  // the distributed update processor following us
+    private DistributedUpdateProcessor.DistribPhase phase;
+    private boolean useFieldCache;
+
+    public DocBasedVersionConstraintsProcessor(String versionField,
+                                               boolean ignoreOldUpdates,
+                                               String deleteVersionParamName,
+                                               boolean useFieldCache,
+                                               SolrQueryRequest req, 
+                                               SolrQueryResponse rsp, 
+                                               UpdateRequestProcessor next ) {
+      super(next);
+      this.ignoreOldUpdates = ignoreOldUpdates;
+      this.deleteVersionParamName = deleteVersionParamName;
+      this.core = req.getCore();
+      this.versionFieldName = versionField;
+      this.userVersionField = core.getLatestSchema().getField(versionField);
+      this.solrVersionField = core.getLatestSchema().getField(VersionInfo.VERSION_FIELD);
+      this.useFieldCache = useFieldCache;
+
+      for (UpdateRequestProcessor proc = next ;proc != null; proc = proc.next) {
+        if (proc instanceof DistributedUpdateProcessor) {
+          distribProc = (DistributedUpdateProcessor)proc;
+          break;
+        }
+      }
+
+      if (distribProc == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
+      }
+
+      phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+    }
+    
+    /**
+     * Inspects a raw field value (which may come from a doc in the index, or a 
+     * doc in the UpdateLog that still has String values, or a String sent by 
+     * the user as a param) and if it is a String, asks the versionField FieldType 
+     * to convert it to an Object suitable for comparison.
+     */
+    private Object convertFieldValueUsingType(SchemaField sf, final Object rawValue) {
+      if (rawValue instanceof CharSequence) {
+        // in theory, the FieldType might still be CharSequence based,
+        // but in that case trust it to do an identiy conversion...
+        FieldType fieldType = userVersionField.getType();
+        BytesRef term = new BytesRef();
+        fieldType.readableToIndexed((CharSequence)rawValue, term);
+        return fieldType.toObject(userVersionField, term);
+      }
+      // else...
+      return rawValue;
+    }
+
+
+    /**
+     * Returns true if the specified new version value is greater the the one
+     * already known to exist for the document, or the document does not already
+     * exist.
+     * Returns false if the specified new version is not high enough but the
+     * processor has been configured with ignoreOldUpdates=true
+     * Throws a SolrException if the version is not high enough and
+     * ignoreOldUpdates=false
+     */
+    private boolean isVersionNewEnough(BytesRef indexedDocId,
+                                       Object newUserVersion) throws IOException {
+      assert null != indexedDocId;
+      assert null != newUserVersion;
+
+      oldSolrVersion = -1;
+
+      newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
+      Object oldUserVersion = null;
+      SolrInputDocument oldDoc = null;
+
+      if (useFieldCache) {
+        oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
+        if (oldDoc == RealTimeGetComponent.DELETED) {
+          return true;
+        }
+        if (oldDoc == null) {
+          // need to look up in index now...
+          RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
+          try {
+            SolrIndexSearcher searcher = newestSearcher.get();
+            long lookup = searcher.lookupId(indexedDocId);
+            if (lookup < 0) {
+              // doc not in index either...
+              return true;
+            }
+
+            ValueSource vs = solrVersionField.getType().getValueSource(solrVersionField, null);
+            Map context = ValueSource.newContext(searcher);
+            vs.createWeight(context, searcher);
+            FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
+            oldSolrVersion = fv.longVal((int)lookup);
+
+            vs = userVersionField.getType().getValueSource(userVersionField, null);
+            context = ValueSource.newContext(searcher);
+            vs.createWeight(context, searcher);
+            fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
+            oldUserVersion = fv.objectVal((int)lookup);
+
+          } catch (IOException e) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+          } finally {
+            if (newestSearcher != null) {
+              newestSearcher.decref();
+            }
+          }
+        }
+      } else {
+        // stored fields only...
+
+        oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
+
+        if (null == oldDoc) {
+          return true;
+        }
+      }
+
+
+      if (oldDoc != null) {
+        oldUserVersion = oldDoc.getFieldValue(versionFieldName);
+        // Make the FieldType resolve any conversion we need.
+        oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
+
+        Object o = oldDoc.getFieldValue(solrVersionField.getName());
+        if (o == null) {
+          throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
+        }
+        oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+      }
+
+
+      if ( null == oldUserVersion) {
+        // could happen if they turn this feature on after building an index
+        // w/o the versionField
+        throw new SolrException(SERVER_ERROR,
+            "Doc exists in index, but has null versionField: "
+                + versionFieldName);
+      }
+
+
+      if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
+        throw new SolrException(BAD_REQUEST,
+            "old version and new version are not comparable: " +
+                oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
+      }
+
+      try {
+        if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
+          return true;
+        }
+        if (ignoreOldUpdates) {
+
+          return false;
+        } else {
+          throw new SolrException(CONFLICT,
+              "user version is not high enough: " + newUserVersion);
+        }
+      } catch (ClassCastException e) {
+        throw new SolrException(BAD_REQUEST,
+            "old version and new version are not comparable: " +
+                oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
+                ": " + e.getMessage(), e);
+
+      }
+    }
+
+
+    private boolean isVersionNewEnoughStoredOnly(BytesRef indexedDocId,
+                                       Object newUserVersion) throws IOException {
+      assert null != indexedDocId;
+      assert null != newUserVersion;
+
+      oldSolrVersion = -1;
+
+      // :TODO: would be nice if a full RTG was not always needed here, ideas...
+      //  - first check fieldCache/docVals - if a versionField exists
+      //    in index that is already greater then this cmd, fail fast 
+      //    (no need to check updateLog, new version already too low)
+      //  - first check if docId is in the updateLog w/o doing the full get, if 
+      //    it's not then check fieldCache/docVals
+      //  - track versionField externally from updateLog (or as a special case 
+      //    that can be looked up by itself - similar to how _version_ is dealt with)
+      //
+      // Depending on if/when/how this is changed, what we assert about
+      // versionField on init will need updated.
+
+
+      newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
+      Object oldUserVersion = null;
+
+
+      SolrInputDocument oldDoc =
+        RealTimeGetComponent.getInputDocument(core, indexedDocId);
+
+      if (null == oldDoc) {
+        return true;
+      }
+      
+      oldUserVersion = oldDoc.getFieldValue(versionFieldName);
+      if ( null == oldUserVersion) {
+        // could happen if they turn this feature on after building an index
+        // w/o the versionField
+        throw new SolrException(SERVER_ERROR,
+                                "Doc exists in index, but has null versionField: "
+                                + versionFieldName);
+      }
+
+      // Make the FieldType resolve any conversion we need.
+      oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
+
+      if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
+        throw new SolrException(BAD_REQUEST, 
+                                "old version and new version are not comparable: " +
+                                oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
+      }
+      
+      try { 
+        if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
+          // since we're going to proceed with this update, we need to find the _version_
+          // so we can use optimistic concurrency.
+
+          Object o = oldDoc.getFieldValue(VersionInfo.VERSION_FIELD);
+          if (o == null) {
+            throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
+          }
+          oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+          return true;
+        }
+        if (ignoreOldUpdates) {
+          return false;
+        } else {
+          throw new SolrException(CONFLICT,
+                                  "user version is not high enough: " + newUserVersion);
+        }
+      } catch (ClassCastException e) {
+        throw new SolrException(BAD_REQUEST, 
+                                "old version and new version are not comparable: " +
+                                oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
+                                ": " + e.getMessage(), e);
+                                
+      }
+    }
+
+    public boolean isLeader(UpdateCommand cmd) {
+      if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+        return false;
+      }
+      if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
+        return false;
+      }
+      // if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
+      return distribProc.isLeader(cmd);
+    }
+
+    public void processAdd(AddUpdateCommand cmd) throws IOException {
+      if (!isLeader(cmd)) {
+        super.processAdd(cmd);
+      }
+
+      final SolrInputDocument newDoc = cmd.getSolrInputDocument();
+
+      Object newVersion = newDoc.getFieldValue(versionFieldName);
+      if ( null == newVersion ) {
+        throw new SolrException(BAD_REQUEST, "Doc does not have versionField: " + versionFieldName);
+      }
+
+      for (int i=0; ;i++) {
+        // Log a warning every 256 retries.... even a few retries should normally be very unusual.
+        if ((i&0xff) == 0xff) {
+          log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
+        }
+
+        if (!isVersionNewEnough(cmd.getIndexedId(), newVersion)) {
+          // drop older update
+          return;
+        }
+
+        try {
+          cmd.setVersion(oldSolrVersion);  // use optimistic concurrency to ensure that the doc has not changed in the meantime
+          super.processAdd(cmd);
+          return;
+        } catch (SolrException e) {
+          if (e.code() == 409) {
+            continue;  // if a version conflict, retry
+          }
+          throw e;  // rethrow
+        }
+
+      }
+    }
+
+    public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+      if (null == deleteVersionParamName) {
+        // not suppose to look at deletes at all
+        super.processDelete(cmd);
+        return;
+      }
+
+      if ( ! cmd.isDeleteById() ) {
+        // nothing to do
+        super.processDelete(cmd);
+        return;
+      }
+
+      String deleteParamValue = cmd.getReq().getParams().get(deleteVersionParamName);
+      if (null == deleteParamValue) {
+        throw new SolrException(BAD_REQUEST,
+            "Delete by ID must specify doc version param: " +
+                deleteVersionParamName);
+      }
+
+
+      if (!isLeader(cmd)) {
+        // transform delete to add earlier rather than later
+
+        SolrInputDocument newDoc = new SolrInputDocument();
+        newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
+            cmd.getId());
+        newDoc.setField(versionFieldName, deleteParamValue);
+
+        AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
+        newCmd.solrDoc = newDoc;
+        newCmd.commitWithin = cmd.commitWithin;
+        super.processAdd(newCmd);
+      }
+
+
+      for (int i=0; ;i++) {
+        // Log a warning every 256 retries.... even a few retries should normally be very unusual.
+        if ((i&0xff) == 0xff) {
+          log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
+        }
+
+        if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValue)) {
+          // drop this older update
+          return;
+        }
+
+        // :TODO: should this logic be split and driven by two params?
+        //   - deleteVersionParam to do a version check
+        //   - some new boolean param to determine if a stub document gets added in place?
+        try {
+          // drop the delete, and instead propogate an AddDoc that
+          // replaces the doc with a new "empty" one that records the deleted version
+
+          SolrInputDocument newDoc = new SolrInputDocument();
+          newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
+              cmd.getId());
+          newDoc.setField(versionFieldName, deleteParamValue);
+
+          AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
+          newCmd.solrDoc = newDoc;
+          newCmd.commitWithin = cmd.commitWithin;
+
+          newCmd.setVersion(oldSolrVersion);  // use optimistic concurrency to ensure that the doc has not changed in the meantime
+          super.processAdd(newCmd);
+          return;
+        } catch (SolrException e) {
+          if (e.code() == 409) {
+            continue;  // if a version conflict, retry
+          }
+          throw e;  // rethrow
+        }
+
+      }
+    }
+
+  } // end inner class
+  
+}

Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml?rev=1537587&view=auto
==============================================================================
Binary file - no diff available.

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,305 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.StrUtils;
+import org.junit.BeforeClass;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
+
+  String bucket1 = "shard1";      // shard1: top bits:10  80000000:ffffffff
+  String bucket2 = "shard2";      // shard2: top bits:00  00000000:7fffffff
+
+  private static String vfield = "my_version_l";
+
+
+  @BeforeClass
+  public static void beforeShardHashingTest() throws Exception {
+    useFactory(null);
+  }
+
+  @Override
+  protected String getCloudSolrConfig() {
+    return "solrconfig-externalversionconstraint.xml";
+  }
+
+  public TestDistribDocBasedVersion() {
+    schemaString = "schema15.xml";      // we need a string id
+    super.sliceCount = 2;
+    super.shardCount = 4;
+    super.fixShardCount = true;  // we only want to test with exactly 2 slices.
+
+
+
+
+    /***
+     hash of a is 3c2569b2 high bits=0 shard=shard3
+     hash of b is 95de7e03 high bits=2 shard=shard1
+     hash of c is e132d65f high bits=3 shard=shard2
+     hash of d is 27191473 high bits=0 shard=shard3
+     hash of e is 656c4367 high bits=1 shard=shard4
+     hash of f is 2b64883b high bits=0 shard=shard3
+     hash of g is f18ae416 high bits=3 shard=shard2
+     hash of h is d482b2d3 high bits=3 shard=shard2
+     hash of i is 811a702b high bits=2 shard=shard1
+     hash of j is ca745a39 high bits=3 shard=shard2
+     hash of k is cfbda5d1 high bits=3 shard=shard2
+     hash of l is 1d5d6a2c high bits=0 shard=shard3
+     hash of m is 5ae4385c high bits=1 shard=shard4
+     hash of n is c651d8ac high bits=3 shard=shard2
+     hash of o is 68348473 high bits=1 shard=shard4
+     hash of p is 986fdf9a high bits=2 shard=shard1
+     hash of q is ff8209e8 high bits=3 shard=shard2
+     hash of r is 5c9373f1 high bits=1 shard=shard4
+     hash of s is ff4acaf1 high bits=3 shard=shard2
+     hash of t is ca87df4d high bits=3 shard=shard2
+     hash of u is 62203ae0 high bits=1 shard=shard4
+     hash of v is bdafcc55 high bits=2 shard=shard1
+     hash of w is ff439d1f high bits=3 shard=shard2
+     hash of x is 3e9a9b1b high bits=0 shard=shard3
+     hash of y is 477d9216 high bits=1 shard=shard4
+     hash of z is c1f69a17 high bits=3 shard=shard2
+     ***/
+  }
+
+  @Override
+  public void doTest() throws Exception {
+    boolean testFinished = false;
+    try {
+      handle.clear();
+      handle.put("QTime", SKIPVAL);
+      handle.put("timestamp", SKIPVAL);
+
+      // todo: do I have to do this here?
+      waitForRecoveriesToFinish(false);
+
+      doTestDocVersions();
+
+      testFinished = true;
+    } finally {
+      if (!testFinished) {
+        printLayoutOnTearDown = true;
+      }
+    }
+  }
+
+
+  private void doTestDocVersions() throws Exception {
+    log.info("### STARTING doTestDocVersions");
+    assertEquals(2, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
+
+    ss = cloudClient;
+
+    vadd("b!doc1", 10);
+    vadd("c!doc2", 11);
+    vadd("d!doc3", 10);
+    vadd("e!doc4", 11);
+
+    doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
+
+    vadd("b!doc1", 5);
+    vadd("c!doc2", 10);
+    vadd("d!doc3", 9);
+    vadd("e!doc4", 8);
+
+    doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
+
+    vadd("b!doc1", 24);
+    vadd("c!doc2", 23);
+    vadd("d!doc3", 22);
+    vadd("e!doc4", 21);
+
+    doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
+
+    vdelete("b!doc1", 20);
+
+    doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
+
+    vdelete("b!doc1", 30);
+
+    doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "30,23,22,21");
+
+    // try delete before add
+    vdelete("b!doc123", 100);
+    vadd("b!doc123", 99);
+    doRTG("b!doc123", "100");
+    // now add greater
+    vadd("b!doc123", 101);
+    doRTG("b!doc123", "101");
+
+
+    //
+    // now test with a non-smart client
+    //
+    List<CloudJettyRunner> runners = shardToJetty.get(bucket2);
+    CloudJettyRunner leader = shardToLeaderJetty.get(bucket2);
+    CloudJettyRunner replica =  null;
+    for (CloudJettyRunner r : runners) {
+      if (r != leader) replica = r;
+    }
+
+    ss = replica.client.solrClient;
+
+    vadd("b!doc5", 10);
+    vadd("c!doc6", 11);
+    vadd("d!doc7", 10);
+    vadd("e!doc8", 11);
+
+    doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
+
+    vadd("b!doc5", 5);
+    vadd("c!doc6", 10);
+    vadd("d!doc7", 9);
+    vadd("e!doc8", 8);
+
+    doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
+
+    vadd("b!doc5", 24);
+    vadd("c!doc6", 23);
+    vadd("d!doc7", 22);
+    vadd("e!doc8", 21);
+
+    doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
+
+    vdelete("b!doc5", 20);
+
+    doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
+
+    vdelete("b!doc5", 30);
+
+    doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "30,23,22,21");
+
+    // try delete before add
+    vdelete("b!doc1234", 100);
+    vadd("b!doc1234", 99);
+    doRTG("b!doc1234", "100");
+    // now add greater
+    vadd("b!doc1234", 101);
+    doRTG("b!doc1234", "101");
+
+    commit();
+
+    // check liveness for all docs
+    doQuery("b!doc123,101,c!doc2,23,d!doc3,22,e!doc4,21,b!doc1234,101,c!doc6,23,d!doc7,22,e!doc8,21", "q","live_b:true");
+    doQuery("b!doc1,30,b!doc5,30", "q","live_b:false");
+
+    // delete by query should just work like normal
+    doDBQ("id:b!doc1 OR id:e*");
+    commit();
+
+    doQuery("b!doc123,101,c!doc2,23,d!doc3,22,b!doc1234,101,c!doc6,23,d!doc7,22", "q","live_b:true");
+    doQuery("b!doc5,30", "q","live_b:false");
+
+  }
+
+  SolrServer ss;
+
+  void vdelete(String id, long version) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteById(id);
+    req.setParam("del_version", Long.toString(version));
+    ss.request(req);
+    // req.process(cloudClient);
+  }
+
+  void vadd(String id, long version) throws Exception {
+    index("id", id, vfield, version);
+  }
+
+  void doQuery(String expectedDocs, String... queryParams) throws Exception {
+
+    List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
+    Map<String, Object> expectedIds = new HashMap<String,Object>();
+    for (int i=0; i<strs.size(); i+=2) {
+      String id = strs.get(i);
+      String vS = strs.get(i+1);
+      Long v = Long.valueOf(vS);
+      expectedIds.put(id,v);
+    }
+
+    QueryResponse rsp = cloudClient.query(params(queryParams));
+    Map<String, Object> obtainedIds = new HashMap<String,Object>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+
+  void doRTG(String ids, String versions) throws Exception {
+    Map<String, Object> expectedIds = new HashMap<String,Object>();
+    List<String> strs = StrUtils.splitSmart(ids, ",", true);
+    List<String> verS = StrUtils.splitSmart(versions, ",", true);
+    for (int i=0; i<strs.size(); i++) {
+      expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
+    }
+
+    ss.query(params("qt","/get", "ids",ids));
+
+    QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
+    Map<String, Object> obtainedIds = new HashMap<String,Object>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+  void doRTG(String ids) throws Exception {
+    ss.query(params("qt","/get", "ids",ids));
+
+    Set<String> expectedIds = new HashSet<String>( StrUtils.splitSmart(ids, ",", true) );
+
+    QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
+    Set<String> obtainedIds = new HashSet<String>();
+    for (SolrDocument doc : rsp.getResults()) {
+      obtainedIds.add((String) doc.get("id"));
+    }
+
+    assertEquals(expectedIds, obtainedIds);
+  }
+
+
+  // TODO: refactor some of this stuff into the SolrJ client... it should be easier to use
+  void doDBQ(String q, String... reqParams) throws Exception {
+    UpdateRequest req = new UpdateRequest();
+    req.deleteByQuery(q);
+    req.setParams(params(reqParams));
+    req.process(cloudClient);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+}

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,321 @@
+package org.apache.solr.search;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.util.TestHarness;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.ObjectBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class TestStressUserVersions extends TestRTGBase {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore("solrconfig-externalversionconstraint.xml","schema15.xml");
+  }
+
+  private static String vfield = "my_version_l";
+  private static String lfield = "live_b";
+  private static String dversion = "del_version";
+
+
+  public static void verbose(Object... args) {
+    // if (!log.isDebugEnabled()) return;
+    StringBuilder sb = new StringBuilder("VERBOSE:");
+    for (Object o : args) {
+      sb.append(' ');
+      sb.append(o==null ? "(null)" : o.toString());
+    }
+    log.info(sb.toString());
+  }
+
+  // This version simulates user versions sometimes being reordered.
+  // It should fail (and currently does) if optimistic concurrency is disabled (cmd.setVersion(currVersion))
+  // in DocBasedVersionConstraintsProcessor.
+  @Test
+  public void testStressReorderVersions() throws Exception {
+    clearIndex();
+    assertU(commit());
+
+    final int commitPercent = 5 + random().nextInt(20);
+    final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
+    final int deletePercent = 4+random().nextInt(25);
+    final int deleteByQueryPercent = random().nextInt(8);
+    final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
+    int nWriteThreads = 5 + random().nextInt(25);
+
+    final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+    // query variables
+    final int percentRealtimeQuery = 75;
+    final AtomicLong operations = new AtomicLong(10000);  // number of query operations to perform in total - ramp up for a longer test
+    int nReadThreads = 5 + random().nextInt(25);
+
+
+    /** // testing
+     final int commitPercent = 5;
+     final int softCommitPercent = 100; // what percent of the commits are soft
+     final int deletePercent = 0;
+     final int deleteByQueryPercent = 50;
+     final int ndocs = 1;
+     int nWriteThreads = 2;
+
+     final int maxConcurrentCommits = nWriteThreads;   // number of committers at a time... it should be <= maxWarmingSearchers
+
+     // query variables
+     final int percentRealtimeQuery = 101;
+     final AtomicLong operations = new AtomicLong(50000);  // number of query operations to perform in total
+     int nReadThreads = 1;
+     **/
+
+
+    verbose("commitPercent",commitPercent, "softCommitPercent",softCommitPercent, "deletePercent",deletePercent, "deleteByQueryPercent",deleteByQueryPercent
+        , "ndocs",ndocs,"nWriteThreads",nWriteThreads,"percentRealtimeQuery",percentRealtimeQuery,"operations",operations, "nReadThreads",nReadThreads);
+
+    initModel(ndocs);
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+
+    final AtomicLong testVersion = new AtomicLong(0);
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random().nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.get() > 0) {
+              int oper = rand.nextInt(100);
+
+              if (oper < commitPercent) {
+                if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                  Map<Integer,DocInfo> newCommittedModel;
+                  long version;
+
+                  synchronized(TestStressUserVersions.this) {
+                    newCommittedModel = new HashMap<Integer,DocInfo>(model);  // take a snapshot
+                    version = snapshotCount++;
+                  }
+
+                  if (rand.nextInt(100) < softCommitPercent) {
+                    verbose("softCommit start");
+                    assertU(TestHarness.commit("softCommit","true"));
+                    verbose("softCommit end");
+                  } else {
+                    verbose("hardCommit start");
+                    assertU(commit());
+                    verbose("hardCommit end");
+                  }
+
+                  synchronized(TestStressUserVersions.this) {
+                    // install this model snapshot only if it's newer than the current one
+                    if (version >= committedModelClock) {
+                      if (VERBOSE) {
+                        verbose("installing new committedModel version="+committedModelClock);
+                      }
+                      committedModel = newCommittedModel;
+                      committedModelClock = version;
+                    }
+                  }
+                }
+                numCommitting.decrementAndGet();
+                continue;
+              }
+
+
+              int id;
+
+              if (rand.nextBoolean()) {
+                id = rand.nextInt(ndocs);
+              } else {
+                id = lastId;  // reuse the last ID half of the time to force more race conditions
+              }
+
+              // set the lastId before we actually change it sometimes to try and
+              // uncover more race conditions between writing and reading
+              boolean before = rand.nextBoolean();
+              if (before) {
+                lastId = id;
+              }
+
+              DocInfo info = model.get(id);
+
+              long val = info.val;
+              long nextVal = Math.abs(val)+1;
+
+              // the version we set on the update should determine who wins
+              // These versions are not derived from the actual leader update handler hand hence this
+              // test may need to change depending on how we handle version numbers.
+              long version = testVersion.incrementAndGet();
+
+              // yield after getting the next version to increase the odds of updates happening out of order
+              if (rand.nextBoolean()) Thread.yield();
+
+              if (oper < commitPercent + deletePercent) {
+                verbose("deleting id",id,"val=",nextVal,"version",version);
+
+                Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params(dversion, Long.toString(version)));
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (Math.abs(version) > Math.abs(currInfo.version)) {
+                    model.put(id, new DocInfo(version, -nextVal));
+                  }
+                }
+
+                verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+
+              } else {
+                verbose("adding id", id, "val=", nextVal,"version",version);
+
+                Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), vfield, Long.toString(version)), null);
+
+                // only update model if the version is newer
+                synchronized (model) {
+                  DocInfo currInfo = model.get(id);
+                  if (version > currInfo.version) {
+                    model.put(id, new DocInfo(version, nextVal));
+                  }
+                }
+
+                if (VERBOSE) {
+                  verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+                }
+
+              }
+              // }   // end sync
+
+              if (!before) {
+                lastId = id;
+              }
+            }
+          } catch (Throwable e) {
+            operations.set(-1L);
+            log.error("",e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random().nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+              DocInfo info;
+
+              if (realTime) {
+                info = model.get(id);
+              } else {
+                synchronized(TestStressUserVersions.this) {
+                  info = committedModel.get(id);
+                }
+              }
+
+              if (VERBOSE) {
+                verbose("querying id", id);
+              }
+              SolrQueryRequest sreq;
+              if (realTime) {
+                sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+              } else {
+                sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              }
+
+              String response = h.query(sreq);
+              Map rsp = (Map)ObjectBuilder.fromJSON(response);
+              List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+              if (doclist.size() == 0) {
+                // there's no info we can get back with a delete, so not much we can check without further synchronization
+              } else {
+                assertEquals(1, doclist.size());
+                boolean isLive = (Boolean)(((Map)doclist.get(0)).get(lfield));
+                long foundVer = (Long)(((Map)doclist.get(0)).get(vfield));
+
+                if (isLive) {
+                  long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+                  if (foundVer < Math.abs(info.version)
+                      || (foundVer == info.version && foundVal != info.val) ) {    // if the version matches, the val must
+                    log.error("ERROR, id=" + id + " found=" + response + " model" + info);
+                    assertTrue(false);
+                  }
+                } else {
+                  // if the doc is deleted (via tombstone), it shouldn't have a value on it.
+                  assertNull( ((Map)doclist.get(0)).get(field) );
+
+                  if (foundVer < Math.abs(info.version)) {
+                    log.error("ERROR, id=" + id + " found=" + response + " model" + info);
+                    assertTrue(false);
+                  }
+                }
+
+              }
+            }
+          } catch (Throwable e) {
+            operations.set(-1L);
+            log.error("",e);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+  }
+
+}

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util._TestUtil;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    initCore("solrconfig-externalversionconstraint.xml", "schema15.xml");
+  }
+
+  @Before
+  public void before() throws Exception {
+    assertU(delQ("*:*"));
+    assertU(commit());
+  }
+
+
+  public void testSimpleUpdates() throws Exception {
+
+    // skip low version against commited data
+    assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
+    assertU(commit());
+    assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
+    assertU(commit());
+    assertU(adoc("id", "aaa", "name", "XX", "my_version_l",    "1"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+
+    // skip low version against uncommited data from updateLog
+    assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
+    assertU(adoc("id", "aaa", "name", "XX", "my_version_l",    "7"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a3'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a3'}}");
+
+    // interleave updates to multiple docs using same versions
+    for (long ver = 1010; ver < 1020; ver++) {
+      for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+        assertU(adoc("id", id, "my_version_l", ""+ver));
+      }
+    }
+    for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+      assertU(adoc("id", id, "name", "XX", "my_version_l", "10"));
+      assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
+               , "=={'doc':{'my_version_l':"+1019+"}}");
+    }
+    assertU(commit());
+    assertJQ(req("q","name:XX"), "/response/numFound==0");
+    for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+      assertJQ(req("q","+id:"+id), "/response/numFound==1");
+      assertJQ(req("q","+name:XX +id:"+id), "/response/numFound==0");
+      assertJQ(req("q","+id:"+id + " +my_version_l:1019"), "/response/numFound==1");
+      assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
+               , "=={'doc':{'my_version_l':"+1019+"}}");
+    }
+  }
+
+  public void testSimpleDeletes() throws Exception {
+
+    // skip low version delete against commited doc
+    assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
+    assertU(commit());
+    assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
+    assertU(commit());
+    deleteAndGetVersion("aaa",
+                        params("del_version", "7"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+
+    // skip low version delete against uncommited doc from updateLog
+    assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
+    deleteAndGetVersion("aaa",
+                        params("del_version", "8"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a3'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a3'}}");
+
+    // skip low version add against uncommited "delete" from updateLog
+    deleteAndGetVersion("aaa", params("del_version", "1010"));
+    assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "22"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}");
+
+    // skip low version add against committed "delete"
+    // (delete was already done & committed above)
+    assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "23"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}");
+  }
+
+  /**
+   * Sanity check that there are no hardcoded assumptions about the 
+   * field type used that could byte us in the ass.
+   */
+  public void testFloatVersionField() throws Exception {
+
+    // skip low version add & low version delete against commited doc
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_f", "10.01")),
+            params("update.chain","external-version-float"));
+    assertU(commit());
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "4.2")),
+            params("update.chain","external-version-float"));
+    assertU(commit());
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a1'}}");
+    deleteAndGetVersion("aaa", params("del_version", "7", 
+                                      "update.chain","external-version-float"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a1'}}");
+    assertU(commit());
+    
+    // skip low version delete against uncommited doc from updateLog
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_f", "10.02")), 
+            params("update.chain","external-version-float"));
+    deleteAndGetVersion("aaa", params("del_version", "8", 
+                                      "update.chain","external-version-float"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+
+    // skip low version add against uncommited "delete" from updateLog
+    deleteAndGetVersion("aaa", params("del_version", "10.10",
+                                      "update.chain","external-version-float"));
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.05")),
+            params("update.chain","external-version-float"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+             , "=={'doc':{'my_version_f':10.10}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+             , "=={'doc':{'my_version_f':10.10}}");
+
+    // skip low version add against committed "delete"
+    // (delete was already done & committed above)
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.09")),
+            params("update.chain","external-version-float"));
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+             , "=={'doc':{'my_version_f':10.10}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+             , "=={'doc':{'my_version_f':10.10}}");
+  }
+
+  public void testFailOnOldVersion() throws Exception {
+
+    // fail low version add & low version delete against commited doc
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_l", "1001")),
+            params("update.chain","external-version-failhard"));
+    assertU(commit());
+    try {
+      updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "42")),
+              params("update.chain","external-version-failhard"));
+      fail("no 409");
+    } catch (SolrException ex) {
+      assertEquals(409, ex.code());
+    }
+    assertU(commit());
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a1'}}");
+    try {
+      deleteAndGetVersion("aaa", params("del_version", "7", 
+                                        "update.chain","external-version-failhard"));
+      fail("no 409");
+    } catch (SolrException ex) {
+      assertEquals(409, ex.code());
+    }
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a1'}}");
+    assertU(commit());
+    
+    // fail low version delete against uncommited doc from updateLog
+    updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_l", "1002")), 
+            params("update.chain","external-version-failhard"));
+    try {
+      deleteAndGetVersion("aaa", params("del_version", "8", 
+                                        "update.chain","external-version-failhard"));
+      fail("no 409");
+    } catch (SolrException ex) {
+      assertEquals(409, ex.code());
+    }
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+    assertJQ(req("qt","/get", "id","aaa", "fl","name")
+             , "=={'doc':{'name':'a2'}}");
+
+    // fail low version add against uncommited "delete" from updateLog
+    deleteAndGetVersion("aaa", params("del_version", "1010",
+                                      "update.chain","external-version-failhard"));
+    try {
+      updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1005")),
+              params("update.chain","external-version-failhard"));
+      fail("no 409");
+    } catch (SolrException ex) {
+      assertEquals(409, ex.code());
+    }
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}");
+
+    // fail low version add against committed "delete"
+    // (delete was already done & committed above)
+    try {
+      updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1009")),
+              params("update.chain","external-version-failhard"));
+      fail("no 409");
+    } catch (SolrException ex) {
+      assertEquals(409, ex.code());
+    }
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}}");
+    assertU(commit());
+    assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+    assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+    assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+             , "=={'doc':{'my_version_l':1010}}");
+  }
+
+
+  /** 
+   * Proof of concept test demonstrating how to manage and periodically cleanup
+   * the "logically" deleted documents
+   */
+  public void testManagingDeletes() throws Exception {
+    // add some docs
+    for (long ver = 1010; ver < 1020; ver++) {
+      for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+        assertU(adoc("id", id, "name", "name_"+id, "my_version_l", ""+ver));
+      }
+    }
+    assertU(adoc("id", "aaa", "name", "name_aaa", "my_version_l", "1030"));
+    assertU(commit());
+    // sample queries
+    assertJQ(req("q","*:*",
+                 "fq","live_b:true")
+             ,"/response/numFound==4");
+    assertJQ(req("q","id:aaa",
+                 "fq","live_b:true",
+                 "fl","id,my_version_l")
+             ,"/response/numFound==1"
+             ,"/response/docs==[{'id':'aaa','my_version_l':1030}]}");
+    // logically delete
+    deleteAndGetVersion("aaa",
+                        params("del_version", "1031"));
+    assertU(commit());
+    // sample queries
+    assertJQ(req("q","*:*",
+                 "fq","live_b:true")
+             ,"/response/numFound==3");
+    assertJQ(req("q","id:aaa",
+                 "fq","live_b:true")
+             ,"/response/numFound==0");
+    // placeholder doc is still in the index though
+    assertJQ(req("q","id:aaa",
+                 "fq","live_b:false",
+                 "fq", "timestamp_tdt:[* TO *]",
+                 "fl","id,live_b,my_version_l")
+             ,"/response/numFound==1"
+             ,"/response/docs==[{'id':'aaa','my_version_l':1031,'live_b':false}]}");
+    // doc can't be re-added with a low version
+    assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "1025"));
+    assertU(commit());
+    assertJQ(req("q","id:aaa",
+                 "fq","live_b:true")
+             ,"/response/numFound==0");
+
+    // "dead" placeholder docs can be periodically cleaned up 
+    // ie: assertU(delQ("+live_b:false +timestamp_tdt:[* TO NOW/MINUTE-5MINUTE]"));
+    // but to prevent the test from ebing time sensitive we'll just purge them all
+    assertU(delQ("+live_b:false"));
+    assertU(commit());
+    // now doc can be re-added w/any version, no matter how low
+    assertU(adoc("id", "aaa", "name", "aaa", "my_version_l", "7"));
+    assertU(commit());
+    assertJQ(req("q","id:aaa",
+                 "fq","live_b:true",
+                 "fl","id,live_b,my_version_l")
+             ,"/response/numFound==1"
+             ,"/response/docs==[{'id':'aaa','my_version_l':7,'live_b':true}]}");
+    
+  }
+
+  /** 
+   * Constantly hammer the same doc with multiple concurrent threads and diff versions,
+   * confirm that the highest version wins.
+   */
+  public void testConcurrentAdds() throws Exception {
+    final int NUM_DOCS = atLeast(50);
+    final int MAX_CONCURENT = atLeast(10);
+    ExecutorService runner = Executors.newFixedThreadPool(MAX_CONCURENT);
+    // runner = Executors.newFixedThreadPool(1);    // to test single threaded
+    try {
+      for (int id = 0; id < NUM_DOCS; id++) {
+        final int numAdds = _TestUtil.nextInt(random(),3,MAX_CONCURENT);
+        final int winner = _TestUtil.nextInt(random(),0,numAdds-1);
+        final int winnerVersion = atLeast(100);
+        final boolean winnerIsDeleted = (0 == _TestUtil.nextInt(random(),0,4));
+        List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(numAdds);
+        for (int variant = 0; variant < numAdds; variant++) {
+          final boolean iShouldWin = (variant==winner);
+          final long version = (iShouldWin ? winnerVersion 
+                                : _TestUtil.nextInt(random(),1,winnerVersion-1));
+          if ((iShouldWin && winnerIsDeleted)
+              || (!iShouldWin && 0 == _TestUtil.nextInt(random(),0,4))) {
+            tasks.add(delayedDelete(""+id, ""+version));
+          } else {
+            tasks.add(delayedAdd("id",""+id,"name","name"+id+"_"+variant,
+                                 "my_version_l", ""+ version));
+          }
+        }
+        runner.invokeAll(tasks);
+        final String expectedDoc = "{'id':'"+id+"','my_version_l':"+winnerVersion +
+          ( ! winnerIsDeleted ? ",'name':'name"+id+"_"+winner+"'}" : "}");
+
+        assertJQ(req("qt","/get", "id",""+id, "fl","id,name,my_version_l")
+                 , "=={'doc':" + expectedDoc + "}");
+        assertU(commit());
+        assertJQ(req("q","id:"+id,
+                     "fl","id,name,my_version_l")
+                 ,"/response/numFound==1"
+                 ,"/response/docs==["+expectedDoc+"]");
+      }
+    } finally {
+      runner.shutdownNow();
+    }
+  }
+  
+  private Callable<Object> delayedAdd(final String... fields) {
+    return Executors.callable(new Runnable() {
+        public void run() {
+          // log.info("ADDING DOC: " + adoc(fields));
+          assertU(adoc(fields));
+        }
+      });
+  }
+  private Callable<Object> delayedDelete(final String id, final String externalVersion) {
+    return Executors.callable(new Runnable() {
+        public void run() {
+          try {
+            // Why does this throw "Exception" ???
+            // log.info("DELETING DOC: " + id + " v="+externalVersion);
+            deleteAndGetVersion(id, params("del_version", externalVersion));
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+  }
+  
+}