You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2013/10/31 20:13:36 UTC
svn commit: r1537587 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/component/
core/src/java/org/apache/solr/update/processor/
core/src/test-files/solr/collection1/conf/
core/src/test/org/apache/solr/cloud/ core/src/test/org/apac...
Author: yonik
Date: Thu Oct 31 19:13:35 2013
New Revision: 1537587
URL: http://svn.apache.org/r1537587
Log:
SOLR-5374: user version update processor
Added:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java (with props)
lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java (with props)
lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Oct 31 19:13:35 2013
@@ -115,6 +115,11 @@ New Features
* SOLR-5406: CloudSolrServer failed to propagate request parameters
along with delete updates. (yonik)
+ * SOLR-5374: Support user configured doc-centric versioning rules
+ via the optional DocBasedVersionConstraintsProcessorFactory
+ update processor (Hossman, yonik)
+
+
Bug Fixes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Thu Oct 31 19:13:35 2013
@@ -203,31 +203,46 @@ public class RealTimeGetComponent extend
}
+
+ public static SolrInputDocument DELETED = new SolrInputDocument();
+
+ /** returns the SolrInputDocument from the current tlog, or DELETED if it has been deleted, or
+ * null if there is no record of it in the current update log. If null is returned, it could
+ * still be in the latest index.
+ */
+ public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes) {
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+
+ if (ulog != null) {
+ Object o = ulog.lookup(idBytes);
+ if (o != null) {
+ // should currently be a List<Oper,Ver,Doc/Id>
+ List entry = (List)o;
+ assert entry.size() >= 3;
+ int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
+ switch (oper) {
+ case UpdateLog.ADD:
+ return (SolrInputDocument)entry.get(entry.size()-1);
+ case UpdateLog.DELETE:
+ return DELETED;
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ }
+ }
+
+ return null;
+ }
+
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-
-
- if (ulog != null) {
- Object o = ulog.lookup(idBytes);
- if (o != null) {
- // should currently be a List<Oper,Ver,Doc/Id>
- List entry = (List)o;
- assert entry.size() >= 3;
- int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
- switch (oper) {
- case UpdateLog.ADD:
- sid = (SolrInputDocument)entry.get(entry.size()-1);
- break;
- case UpdateLog.DELETE:
- return null;
- default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
- }
- }
+ sid = getInputDocumentFromTlog(core, idBytes);
+ if (sid == DELETED) {
+ return null;
}
if (sid == null) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1537587&r1=1537586&r2=1537587&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Oct 31 19:13:35 2013
@@ -1111,6 +1111,25 @@ public class DistributedUpdateProcessor
}
}
+ // internal helper method to tell if we are the leader for an add or deleteById update
+ boolean isLeader(UpdateCommand cmd) {
+ updateCommand = cmd;
+
+ if (zkEnabled) {
+ zkCheck();
+ if (cmd instanceof AddUpdateCommand) {
+ AddUpdateCommand acmd = (AddUpdateCommand)cmd;
+ nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
+ } else if (cmd instanceof DeleteUpdateCommand) {
+ DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
+ nodes = setupRequest(dcmd.getId(), null);
+ }
+ } else {
+ isLeader = getNonZkLeaderAssumption(req);
+ }
+
+ return isLeader;
+ }
private void zkCheck() {
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update.processor;
+
+import org.apache.lucene.queries.function.FunctionValues;
+import org.apache.lucene.queries.function.ValueSource;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.FieldType;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.UpdateCommand;
+import org.apache.solr.update.VersionInfo;
+import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
+import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+/**
+ * <p>
+ * This Factory generates an UpdateProcessor that helps to enforce Version
+ * constraints on documents based on per-document version numbers using a configured
+ * name of a <code>versionField</code>. It should be configured on the "default"
+ * update processor somewhere before the DistributedUpdateProcessorFactory.
+ * As an example, see the solrconfig.xml that the tests use:
+ * solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
+ * </p>
+ * <p>
+ * When documents are added through this processor, if a document with the same
+ * unique key already exists in the collection, then the value of the
+ * <code>versionField</code> in the <i>existing</i> document is not less then the
+ * field value in the <i>new</i> document then the new document is rejected with a
+ * 409 Version Conflict error.
+ * </p>
+ * <p>
+ * In addition to the mandatory <code>versionField</code> init param, two additional
+ * optional init params affect the behavior of this factory:
+ * </p>
+ * <ul>
+ * <li><code>deleteVersionParam</code> - This string parameter controls whether this
+ * processor will intercept and inspect Delete By Id commands in addition to adding
+ * documents. If specified, then the value will specify the name of a request
+ * paramater which becomes mandatory for all Delete By Id commands. This param
+ * must then be used to specify the document version associated with the delete.
+ * If the version specified using this param is not greater then the value in the
+ * <code>versionField</code> for any existing document, then the delete will fail
+ * with a 409 Version Conflict error. When using this param, Any Delete By Id
+ * command with a high enough document version number to succeed will be internally
+ * converted into an Add Document command that replaces the existing document with
+ * a new one which is empty except for the Unique Key and <code>versionField</code>
+ * to keeping a record of the deleted version so future Add Document commands will
+ * fail if their "new" version is not high enough.</li>
+ *
+ * <li><code>ignoreOldUpdates</code> - This boolean parameter defaults to
+ * <code>false</code>, but if set to <code>true</code> causes any update with a
+ * document version that is not great enough to be silently ignored (and return
+ * a status 200 to the client) instead of generating a 409 Version Conflict error.
+ * </li>
+ * </ul>
+ */
+public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+ public final static Logger log = LoggerFactory.getLogger(DocBasedVersionConstraintsProcessorFactory.class);
+
+ private boolean ignoreOldUpdates = false;
+ private String versionField = null;
+ private String deleteVersionParamName = null;
+ private boolean useFieldCache;
+
+ @Override
+ public void init( NamedList args ) {
+
+ Object tmp = args.remove("versionField");
+ if (null == tmp) {
+ throw new SolrException(SERVER_ERROR,
+ "'versionField' must be configured");
+ }
+ if (! (tmp instanceof String) ) {
+ throw new SolrException(SERVER_ERROR,
+ "'versionField' must be configured as a <str>");
+ }
+ versionField = tmp.toString();
+
+ // optional
+ tmp = args.remove("deleteVersionParam");
+ if (null != tmp) {
+ if (! (tmp instanceof String) ) {
+ throw new SolrException(SERVER_ERROR,
+ "'deleteVersionParam' must be configured as a <str>");
+ }
+ deleteVersionParamName = tmp.toString();
+ }
+
+ // optional - defaults to false
+ tmp = args.remove("ignoreOldUpdates");
+ if (null != tmp) {
+ if (! (tmp instanceof Boolean) ) {
+ throw new SolrException(SERVER_ERROR,
+ "'ignoreOldUpdates' must be configured as a <bool>");
+ }
+ ignoreOldUpdates = ((Boolean)tmp).booleanValue();
+ }
+ super.init(args);
+ }
+
+
+ public UpdateRequestProcessor getInstance(SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ UpdateRequestProcessor next ) {
+ return new DocBasedVersionConstraintsProcessor(versionField,
+ ignoreOldUpdates,
+ deleteVersionParamName,
+ useFieldCache,
+ req, rsp, next);
+ }
+
+ @Override
+ public void inform(SolrCore core) {
+
+ if (core.getUpdateHandler().getUpdateLog() == null) {
+ throw new SolrException(SERVER_ERROR,
+ "updateLog must be enabled.");
+ }
+
+ if (core.getLatestSchema().getUniqueKeyField() == null) {
+ throw new SolrException(SERVER_ERROR,
+ "schema must have uniqueKey defined.");
+ }
+
+ SchemaField userVersionField = core.getLatestSchema().getField(versionField);
+ if (userVersionField == null || !userVersionField.stored() || userVersionField.multiValued()) {
+ throw new SolrException(SERVER_ERROR,
+ "field " + versionField + " must be defined in schema, be stored, and be single valued.");
+ }
+
+ try {
+ ValueSource vs = userVersionField.getType().getValueSource(userVersionField, null);
+ useFieldCache = true;
+ } catch (Exception e) {
+ log.warn("Can't use fieldcache/valuesource: " + e.getMessage());
+ }
+ }
+
+
+
+ private static class DocBasedVersionConstraintsProcessor
+ extends UpdateRequestProcessor {
+
+ private final String versionFieldName;
+ private final SchemaField userVersionField;
+ private final SchemaField solrVersionField;
+ private final boolean ignoreOldUpdates;
+ private final String deleteVersionParamName;
+ private final SolrCore core;
+
+ private long oldSolrVersion; // current _version_ of the doc in the index/update log
+ private DistributedUpdateProcessor distribProc; // the distributed update processor following us
+ private DistributedUpdateProcessor.DistribPhase phase;
+ private boolean useFieldCache;
+
+ public DocBasedVersionConstraintsProcessor(String versionField,
+ boolean ignoreOldUpdates,
+ String deleteVersionParamName,
+ boolean useFieldCache,
+ SolrQueryRequest req,
+ SolrQueryResponse rsp,
+ UpdateRequestProcessor next ) {
+ super(next);
+ this.ignoreOldUpdates = ignoreOldUpdates;
+ this.deleteVersionParamName = deleteVersionParamName;
+ this.core = req.getCore();
+ this.versionFieldName = versionField;
+ this.userVersionField = core.getLatestSchema().getField(versionField);
+ this.solrVersionField = core.getLatestSchema().getField(VersionInfo.VERSION_FIELD);
+ this.useFieldCache = useFieldCache;
+
+ for (UpdateRequestProcessor proc = next ;proc != null; proc = proc.next) {
+ if (proc instanceof DistributedUpdateProcessor) {
+ distribProc = (DistributedUpdateProcessor)proc;
+ break;
+ }
+ }
+
+ if (distribProc == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
+ }
+
+ phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
+
+ }
+
+ /**
+ * Inspects a raw field value (which may come from a doc in the index, or a
+ * doc in the UpdateLog that still has String values, or a String sent by
+ * the user as a param) and if it is a String, asks the versionField FieldType
+ * to convert it to an Object suitable for comparison.
+ */
+ private Object convertFieldValueUsingType(SchemaField sf, final Object rawValue) {
+ if (rawValue instanceof CharSequence) {
+ // in theory, the FieldType might still be CharSequence based,
+ // but in that case trust it to do an identiy conversion...
+ FieldType fieldType = userVersionField.getType();
+ BytesRef term = new BytesRef();
+ fieldType.readableToIndexed((CharSequence)rawValue, term);
+ return fieldType.toObject(userVersionField, term);
+ }
+ // else...
+ return rawValue;
+ }
+
+
+ /**
+ * Returns true if the specified new version value is greater the the one
+ * already known to exist for the document, or the document does not already
+ * exist.
+ * Returns false if the specified new version is not high enough but the
+ * processor has been configured with ignoreOldUpdates=true
+ * Throws a SolrException if the version is not high enough and
+ * ignoreOldUpdates=false
+ */
+ private boolean isVersionNewEnough(BytesRef indexedDocId,
+ Object newUserVersion) throws IOException {
+ assert null != indexedDocId;
+ assert null != newUserVersion;
+
+ oldSolrVersion = -1;
+
+ newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
+ Object oldUserVersion = null;
+ SolrInputDocument oldDoc = null;
+
+ if (useFieldCache) {
+ oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
+ if (oldDoc == RealTimeGetComponent.DELETED) {
+ return true;
+ }
+ if (oldDoc == null) {
+ // need to look up in index now...
+ RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
+ try {
+ SolrIndexSearcher searcher = newestSearcher.get();
+ long lookup = searcher.lookupId(indexedDocId);
+ if (lookup < 0) {
+ // doc not in index either...
+ return true;
+ }
+
+ ValueSource vs = solrVersionField.getType().getValueSource(solrVersionField, null);
+ Map context = ValueSource.newContext(searcher);
+ vs.createWeight(context, searcher);
+ FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
+ oldSolrVersion = fv.longVal((int)lookup);
+
+ vs = userVersionField.getType().getValueSource(userVersionField, null);
+ context = ValueSource.newContext(searcher);
+ vs.createWeight(context, searcher);
+ fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
+ oldUserVersion = fv.objectVal((int)lookup);
+
+ } catch (IOException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
+ } finally {
+ if (newestSearcher != null) {
+ newestSearcher.decref();
+ }
+ }
+ }
+ } else {
+ // stored fields only...
+
+ oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
+
+ if (null == oldDoc) {
+ return true;
+ }
+ }
+
+
+ if (oldDoc != null) {
+ oldUserVersion = oldDoc.getFieldValue(versionFieldName);
+ // Make the FieldType resolve any conversion we need.
+ oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
+
+ Object o = oldDoc.getFieldValue(solrVersionField.getName());
+ if (o == null) {
+ throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
+ }
+ oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+ }
+
+
+ if ( null == oldUserVersion) {
+ // could happen if they turn this feature on after building an index
+ // w/o the versionField
+ throw new SolrException(SERVER_ERROR,
+ "Doc exists in index, but has null versionField: "
+ + versionFieldName);
+ }
+
+
+ if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
+ throw new SolrException(BAD_REQUEST,
+ "old version and new version are not comparable: " +
+ oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
+ }
+
+ try {
+ if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
+ return true;
+ }
+ if (ignoreOldUpdates) {
+
+ return false;
+ } else {
+ throw new SolrException(CONFLICT,
+ "user version is not high enough: " + newUserVersion);
+ }
+ } catch (ClassCastException e) {
+ throw new SolrException(BAD_REQUEST,
+ "old version and new version are not comparable: " +
+ oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
+ ": " + e.getMessage(), e);
+
+ }
+ }
+
+
+ private boolean isVersionNewEnoughStoredOnly(BytesRef indexedDocId,
+ Object newUserVersion) throws IOException {
+ assert null != indexedDocId;
+ assert null != newUserVersion;
+
+ oldSolrVersion = -1;
+
+ // :TODO: would be nice if a full RTG was not always needed here, ideas...
+ // - first check fieldCache/docVals - if a versionField exists
+ // in index that is already greater then this cmd, fail fast
+ // (no need to check updateLog, new version already too low)
+ // - first check if docId is in the updateLog w/o doing the full get, if
+ // it's not then check fieldCache/docVals
+ // - track versionField externally from updateLog (or as a special case
+ // that can be looked up by itself - similar to how _version_ is dealt with)
+ //
+ // Depending on if/when/how this is changed, what we assert about
+ // versionField on init will need updated.
+
+
+ newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
+ Object oldUserVersion = null;
+
+
+ SolrInputDocument oldDoc =
+ RealTimeGetComponent.getInputDocument(core, indexedDocId);
+
+ if (null == oldDoc) {
+ return true;
+ }
+
+ oldUserVersion = oldDoc.getFieldValue(versionFieldName);
+ if ( null == oldUserVersion) {
+ // could happen if they turn this feature on after building an index
+ // w/o the versionField
+ throw new SolrException(SERVER_ERROR,
+ "Doc exists in index, but has null versionField: "
+ + versionFieldName);
+ }
+
+ // Make the FieldType resolve any conversion we need.
+ oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
+
+ if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
+ throw new SolrException(BAD_REQUEST,
+ "old version and new version are not comparable: " +
+ oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
+ }
+
+ try {
+ if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
+ // since we're going to proceed with this update, we need to find the _version_
+ // so we can use optimistic concurrency.
+
+ Object o = oldDoc.getFieldValue(VersionInfo.VERSION_FIELD);
+ if (o == null) {
+ throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
+ }
+ oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
+ return true;
+ }
+ if (ignoreOldUpdates) {
+ return false;
+ } else {
+ throw new SolrException(CONFLICT,
+ "user version is not high enough: " + newUserVersion);
+ }
+ } catch (ClassCastException e) {
+ throw new SolrException(BAD_REQUEST,
+ "old version and new version are not comparable: " +
+ oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
+ ": " + e.getMessage(), e);
+
+ }
+ }
+
+ public boolean isLeader(UpdateCommand cmd) {
+ if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
+ return false;
+ }
+ if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
+ return false;
+ }
+ // if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
+ return distribProc.isLeader(cmd);
+ }
+
+ public void processAdd(AddUpdateCommand cmd) throws IOException {
+ if (!isLeader(cmd)) {
+ super.processAdd(cmd);
+ }
+
+ final SolrInputDocument newDoc = cmd.getSolrInputDocument();
+
+ Object newVersion = newDoc.getFieldValue(versionFieldName);
+ if ( null == newVersion ) {
+ throw new SolrException(BAD_REQUEST, "Doc does not have versionField: " + versionFieldName);
+ }
+
+ for (int i=0; ;i++) {
+ // Log a warning every 256 retries.... even a few retries should normally be very unusual.
+ if ((i&0xff) == 0xff) {
+ log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
+ }
+
+ if (!isVersionNewEnough(cmd.getIndexedId(), newVersion)) {
+ // drop older update
+ return;
+ }
+
+ try {
+ cmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
+ super.processAdd(cmd);
+ return;
+ } catch (SolrException e) {
+ if (e.code() == 409) {
+ continue; // if a version conflict, retry
+ }
+ throw e; // rethrow
+ }
+
+ }
+ }
+
+ public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ if (null == deleteVersionParamName) {
+ // not suppose to look at deletes at all
+ super.processDelete(cmd);
+ return;
+ }
+
+ if ( ! cmd.isDeleteById() ) {
+ // nothing to do
+ super.processDelete(cmd);
+ return;
+ }
+
+ String deleteParamValue = cmd.getReq().getParams().get(deleteVersionParamName);
+ if (null == deleteParamValue) {
+ throw new SolrException(BAD_REQUEST,
+ "Delete by ID must specify doc version param: " +
+ deleteVersionParamName);
+ }
+
+
+ if (!isLeader(cmd)) {
+ // transform delete to add earlier rather than later
+
+ SolrInputDocument newDoc = new SolrInputDocument();
+ newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
+ cmd.getId());
+ newDoc.setField(versionFieldName, deleteParamValue);
+
+ AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
+ newCmd.solrDoc = newDoc;
+ newCmd.commitWithin = cmd.commitWithin;
+ super.processAdd(newCmd);
+ }
+
+
+ for (int i=0; ;i++) {
+ // Log a warning every 256 retries.... even a few retries should normally be very unusual.
+ if ((i&0xff) == 0xff) {
+ log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
+ }
+
+ if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValue)) {
+ // drop this older update
+ return;
+ }
+
+ // :TODO: should this logic be split and driven by two params?
+ // - deleteVersionParam to do a version check
+ // - some new boolean param to determine if a stub document gets added in place?
+ try {
+ // drop the delete, and instead propogate an AddDoc that
+ // replaces the doc with a new "empty" one that records the deleted version
+
+ SolrInputDocument newDoc = new SolrInputDocument();
+ newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
+ cmd.getId());
+ newDoc.setField(versionFieldName, deleteParamValue);
+
+ AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
+ newCmd.solrDoc = newDoc;
+ newCmd.commitWithin = cmd.commitWithin;
+
+ newCmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
+ super.processAdd(newCmd);
+ return;
+ } catch (SolrException e) {
+ if (e.code() == 409) {
+ continue; // if a version conflict, retry
+ }
+ throw e; // rethrow
+ }
+
+ }
+ }
+
+ } // end inner class
+
+}
Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml?rev=1537587&view=auto
==============================================================================
Binary file - no diff available.
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestDistribDocBasedVersion.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,305 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.StrUtils;
+import org.junit.BeforeClass;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
+
+ String bucket1 = "shard1"; // shard1: top bits:10 80000000:ffffffff
+ String bucket2 = "shard2"; // shard2: top bits:00 00000000:7fffffff
+
+ private static String vfield = "my_version_l";
+
+
+ @BeforeClass
+ public static void beforeShardHashingTest() throws Exception {
+ useFactory(null);
+ }
+
+ @Override
+ protected String getCloudSolrConfig() {
+ return "solrconfig-externalversionconstraint.xml";
+ }
+
+ public TestDistribDocBasedVersion() {
+ schemaString = "schema15.xml"; // we need a string id
+ super.sliceCount = 2;
+ super.shardCount = 4;
+ super.fixShardCount = true; // we only want to test with exactly 2 slices.
+
+
+
+
+ /***
+ hash of a is 3c2569b2 high bits=0 shard=shard3
+ hash of b is 95de7e03 high bits=2 shard=shard1
+ hash of c is e132d65f high bits=3 shard=shard2
+ hash of d is 27191473 high bits=0 shard=shard3
+ hash of e is 656c4367 high bits=1 shard=shard4
+ hash of f is 2b64883b high bits=0 shard=shard3
+ hash of g is f18ae416 high bits=3 shard=shard2
+ hash of h is d482b2d3 high bits=3 shard=shard2
+ hash of i is 811a702b high bits=2 shard=shard1
+ hash of j is ca745a39 high bits=3 shard=shard2
+ hash of k is cfbda5d1 high bits=3 shard=shard2
+ hash of l is 1d5d6a2c high bits=0 shard=shard3
+ hash of m is 5ae4385c high bits=1 shard=shard4
+ hash of n is c651d8ac high bits=3 shard=shard2
+ hash of o is 68348473 high bits=1 shard=shard4
+ hash of p is 986fdf9a high bits=2 shard=shard1
+ hash of q is ff8209e8 high bits=3 shard=shard2
+ hash of r is 5c9373f1 high bits=1 shard=shard4
+ hash of s is ff4acaf1 high bits=3 shard=shard2
+ hash of t is ca87df4d high bits=3 shard=shard2
+ hash of u is 62203ae0 high bits=1 shard=shard4
+ hash of v is bdafcc55 high bits=2 shard=shard1
+ hash of w is ff439d1f high bits=3 shard=shard2
+ hash of x is 3e9a9b1b high bits=0 shard=shard3
+ hash of y is 477d9216 high bits=1 shard=shard4
+ hash of z is c1f69a17 high bits=3 shard=shard2
+ ***/
+ }
+
+ @Override
+ public void doTest() throws Exception {
+ boolean testFinished = false;
+ try {
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ // todo: do I have to do this here?
+ waitForRecoveriesToFinish(false);
+
+ doTestDocVersions();
+
+ testFinished = true;
+ } finally {
+ if (!testFinished) {
+ printLayoutOnTearDown = true;
+ }
+ }
+ }
+
+
+ private void doTestDocVersions() throws Exception {
+ log.info("### STARTING doTestDocVersions");
+ assertEquals(2, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
+
+ ss = cloudClient;
+
+ vadd("b!doc1", 10);
+ vadd("c!doc2", 11);
+ vadd("d!doc3", 10);
+ vadd("e!doc4", 11);
+
+ doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
+
+ vadd("b!doc1", 5);
+ vadd("c!doc2", 10);
+ vadd("d!doc3", 9);
+ vadd("e!doc4", 8);
+
+ doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
+
+ vadd("b!doc1", 24);
+ vadd("c!doc2", 23);
+ vadd("d!doc3", 22);
+ vadd("e!doc4", 21);
+
+ doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
+
+ vdelete("b!doc1", 20);
+
+ doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
+
+ vdelete("b!doc1", 30);
+
+ doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "30,23,22,21");
+
+ // try delete before add
+ vdelete("b!doc123", 100);
+ vadd("b!doc123", 99);
+ doRTG("b!doc123", "100");
+ // now add greater
+ vadd("b!doc123", 101);
+ doRTG("b!doc123", "101");
+
+
+ //
+ // now test with a non-smart client
+ //
+ List<CloudJettyRunner> runners = shardToJetty.get(bucket2);
+ CloudJettyRunner leader = shardToLeaderJetty.get(bucket2);
+ CloudJettyRunner replica = null;
+ for (CloudJettyRunner r : runners) {
+ if (r != leader) replica = r;
+ }
+
+ ss = replica.client.solrClient;
+
+ vadd("b!doc5", 10);
+ vadd("c!doc6", 11);
+ vadd("d!doc7", 10);
+ vadd("e!doc8", 11);
+
+ doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
+
+ vadd("b!doc5", 5);
+ vadd("c!doc6", 10);
+ vadd("d!doc7", 9);
+ vadd("e!doc8", 8);
+
+ doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
+
+ vadd("b!doc5", 24);
+ vadd("c!doc6", 23);
+ vadd("d!doc7", 22);
+ vadd("e!doc8", 21);
+
+ doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
+
+ vdelete("b!doc5", 20);
+
+ doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
+
+ vdelete("b!doc5", 30);
+
+ doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "30,23,22,21");
+
+ // try delete before add
+ vdelete("b!doc1234", 100);
+ vadd("b!doc1234", 99);
+ doRTG("b!doc1234", "100");
+ // now add greater
+ vadd("b!doc1234", 101);
+ doRTG("b!doc1234", "101");
+
+ commit();
+
+ // check liveness for all docs
+ doQuery("b!doc123,101,c!doc2,23,d!doc3,22,e!doc4,21,b!doc1234,101,c!doc6,23,d!doc7,22,e!doc8,21", "q","live_b:true");
+ doQuery("b!doc1,30,b!doc5,30", "q","live_b:false");
+
+ // delete by query should just work like normal
+ doDBQ("id:b!doc1 OR id:e*");
+ commit();
+
+ doQuery("b!doc123,101,c!doc2,23,d!doc3,22,b!doc1234,101,c!doc6,23,d!doc7,22", "q","live_b:true");
+ doQuery("b!doc5,30", "q","live_b:false");
+
+ }
+
+ SolrServer ss;
+
+ void vdelete(String id, long version) throws Exception {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteById(id);
+ req.setParam("del_version", Long.toString(version));
+ ss.request(req);
+ // req.process(cloudClient);
+ }
+
+ void vadd(String id, long version) throws Exception {
+ index("id", id, vfield, version);
+ }
+
+ void doQuery(String expectedDocs, String... queryParams) throws Exception {
+
+ List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
+ Map<String, Object> expectedIds = new HashMap<String,Object>();
+ for (int i=0; i<strs.size(); i+=2) {
+ String id = strs.get(i);
+ String vS = strs.get(i+1);
+ Long v = Long.valueOf(vS);
+ expectedIds.put(id,v);
+ }
+
+ QueryResponse rsp = cloudClient.query(params(queryParams));
+ Map<String, Object> obtainedIds = new HashMap<String,Object>();
+ for (SolrDocument doc : rsp.getResults()) {
+ obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+ }
+
+ assertEquals(expectedIds, obtainedIds);
+ }
+
+
+ void doRTG(String ids, String versions) throws Exception {
+ Map<String, Object> expectedIds = new HashMap<String,Object>();
+ List<String> strs = StrUtils.splitSmart(ids, ",", true);
+ List<String> verS = StrUtils.splitSmart(versions, ",", true);
+ for (int i=0; i<strs.size(); i++) {
+ expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
+ }
+
+ ss.query(params("qt","/get", "ids",ids));
+
+ QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
+ Map<String, Object> obtainedIds = new HashMap<String,Object>();
+ for (SolrDocument doc : rsp.getResults()) {
+ obtainedIds.put((String) doc.get("id"), doc.get(vfield));
+ }
+
+ assertEquals(expectedIds, obtainedIds);
+ }
+
+ void doRTG(String ids) throws Exception {
+ ss.query(params("qt","/get", "ids",ids));
+
+ Set<String> expectedIds = new HashSet<String>( StrUtils.splitSmart(ids, ",", true) );
+
+ QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
+ Set<String> obtainedIds = new HashSet<String>();
+ for (SolrDocument doc : rsp.getResults()) {
+ obtainedIds.add((String) doc.get("id"));
+ }
+
+ assertEquals(expectedIds, obtainedIds);
+ }
+
+
+ // TODO: refactor some of this stuff into the SolrJ client... it should be easier to use
+ void doDBQ(String q, String... reqParams) throws Exception {
+ UpdateRequest req = new UpdateRequest();
+ req.deleteByQuery(q);
+ req.setParams(params(reqParams));
+ req.process(cloudClient);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestStressUserVersions.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,321 @@
+package org.apache.solr.search;
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.util.TestHarness;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.noggit.ObjectBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class TestStressUserVersions extends TestRTGBase {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig-externalversionconstraint.xml","schema15.xml");
+ }
+
+ private static String vfield = "my_version_l";
+ private static String lfield = "live_b";
+ private static String dversion = "del_version";
+
+
+ public static void verbose(Object... args) {
+ // if (!log.isDebugEnabled()) return;
+ StringBuilder sb = new StringBuilder("VERBOSE:");
+ for (Object o : args) {
+ sb.append(' ');
+ sb.append(o==null ? "(null)" : o.toString());
+ }
+ log.info(sb.toString());
+ }
+
+ // This version simulates user versions sometimes being reordered.
+ // It should fail (and currently does) if optimistic concurrency is disabled (cmd.setVersion(currVersion))
+ // in DocBasedVersionConstraintsProcessor.
+ @Test
+ public void testStressReorderVersions() throws Exception {
+ clearIndex();
+ assertU(commit());
+
+ final int commitPercent = 5 + random().nextInt(20);
+ final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4+random().nextInt(25);
+ final int deleteByQueryPercent = random().nextInt(8);
+ final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
+ int nWriteThreads = 5 + random().nextInt(25);
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total - ramp up for a longer test
+ int nReadThreads = 5 + random().nextInt(25);
+
+
+ /** // testing
+ final int commitPercent = 5;
+ final int softCommitPercent = 100; // what percent of the commits are soft
+ final int deletePercent = 0;
+ final int deleteByQueryPercent = 50;
+ final int ndocs = 1;
+ int nWriteThreads = 2;
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 101;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 1;
+ **/
+
+
+ verbose("commitPercent",commitPercent, "softCommitPercent",softCommitPercent, "deletePercent",deletePercent, "deleteByQueryPercent",deleteByQueryPercent
+ , "ndocs",ndocs,"nWriteThreads",nWriteThreads,"percentRealtimeQuery",percentRealtimeQuery,"operations",operations, "nReadThreads",nReadThreads);
+
+ initModel(ndocs);
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+
+ final AtomicLong testVersion = new AtomicLong(0);
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,DocInfo> newCommittedModel;
+ long version;
+
+ synchronized(TestStressUserVersions.this) {
+ newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
+ version = snapshotCount++;
+ }
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ verbose("softCommit start");
+ assertU(TestHarness.commit("softCommit","true"));
+ verbose("softCommit end");
+ } else {
+ verbose("hardCommit start");
+ assertU(commit());
+ verbose("hardCommit end");
+ }
+
+ synchronized(TestStressUserVersions.this) {
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ verbose("installing new committedModel version="+committedModelClock);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ continue;
+ }
+
+
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ DocInfo info = model.get(id);
+
+ long val = info.val;
+ long nextVal = Math.abs(val)+1;
+
+ // the version we set on the update should determine who wins
+ // These versions are not derived from the actual leader update handler hand hence this
+ // test may need to change depending on how we handle version numbers.
+ long version = testVersion.incrementAndGet();
+
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
+
+ if (oper < commitPercent + deletePercent) {
+ verbose("deleting id",id,"val=",nextVal,"version",version);
+
+ Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params(dversion, Long.toString(version)));
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (Math.abs(version) > Math.abs(currInfo.version)) {
+ model.put(id, new DocInfo(version, -nextVal));
+ }
+ }
+
+ verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
+
+ } else {
+ verbose("adding id", id, "val=", nextVal,"version",version);
+
+ Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), vfield, Long.toString(version)), null);
+
+ // only update model if the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (version > currInfo.version) {
+ model.put(id, new DocInfo(version, nextVal));
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
+ }
+
+ }
+ // } // end sync
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("",e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo info;
+
+ if (realTime) {
+ info = model.get(id);
+ } else {
+ synchronized(TestStressUserVersions.this) {
+ info = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ verbose("querying id", id);
+ }
+ SolrQueryRequest sreq;
+ if (realTime) {
+ sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
+ } else {
+ sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ }
+
+ String response = h.query(sreq);
+ Map rsp = (Map)ObjectBuilder.fromJSON(response);
+ List doclist = (List)(((Map)rsp.get("response")).get("docs"));
+ if (doclist.size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else {
+ assertEquals(1, doclist.size());
+ boolean isLive = (Boolean)(((Map)doclist.get(0)).get(lfield));
+ long foundVer = (Long)(((Map)doclist.get(0)).get(vfield));
+
+ if (isLive) {
+ long foundVal = (Long)(((Map)doclist.get(0)).get(field));
+ if (foundVer < Math.abs(info.version)
+ || (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
+ log.error("ERROR, id=" + id + " found=" + response + " model" + info);
+ assertTrue(false);
+ }
+ } else {
+ // if the doc is deleted (via tombstone), it shouldn't have a value on it.
+ assertNull( ((Map)doclist.get(0)).get(field) );
+
+ if (foundVer < Math.abs(info.version)) {
+ log.error("ERROR, id=" + id + " found=" + response + " model" + info);
+ assertTrue(false);
+ }
+ }
+
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("",e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ }
+
+}
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java?rev=1537587&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/TestDocBasedVersionConstraints.java Thu Oct 31 19:13:35 2013
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import org.apache.lucene.util._TestUtil;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig-externalversionconstraint.xml", "schema15.xml");
+ }
+
+ @Before
+ public void before() throws Exception {
+ assertU(delQ("*:*"));
+ assertU(commit());
+ }
+
+
+ public void testSimpleUpdates() throws Exception {
+
+ // skip low version against commited data
+ assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
+ assertU(commit());
+ assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
+ assertU(commit());
+ assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "1"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+
+ // skip low version against uncommited data from updateLog
+ assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
+ assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "7"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a3'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a3'}}");
+
+ // interleave updates to multiple docs using same versions
+ for (long ver = 1010; ver < 1020; ver++) {
+ for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+ assertU(adoc("id", id, "my_version_l", ""+ver));
+ }
+ }
+ for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+ assertU(adoc("id", id, "name", "XX", "my_version_l", "10"));
+ assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
+ , "=={'doc':{'my_version_l':"+1019+"}}");
+ }
+ assertU(commit());
+ assertJQ(req("q","name:XX"), "/response/numFound==0");
+ for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+ assertJQ(req("q","+id:"+id), "/response/numFound==1");
+ assertJQ(req("q","+name:XX +id:"+id), "/response/numFound==0");
+ assertJQ(req("q","+id:"+id + " +my_version_l:1019"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
+ , "=={'doc':{'my_version_l':"+1019+"}}");
+ }
+ }
+
+ public void testSimpleDeletes() throws Exception {
+
+ // skip low version delete against commited doc
+ assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
+ assertU(commit());
+ assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
+ assertU(commit());
+ deleteAndGetVersion("aaa",
+ params("del_version", "7"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+
+ // skip low version delete against uncommited doc from updateLog
+ assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
+ deleteAndGetVersion("aaa",
+ params("del_version", "8"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a3'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a3'}}");
+
+ // skip low version add against uncommited "delete" from updateLog
+ deleteAndGetVersion("aaa", params("del_version", "1010"));
+ assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "22"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}");
+
+ // skip low version add against committed "delete"
+ // (delete was already done & committed above)
+ assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "23"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}");
+ }
+
+ /**
+ * Sanity check that there are no hardcoded assumptions about the
+ * field type used that could byte us in the ass.
+ */
+ public void testFloatVersionField() throws Exception {
+
+ // skip low version add & low version delete against commited doc
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_f", "10.01")),
+ params("update.chain","external-version-float"));
+ assertU(commit());
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "4.2")),
+ params("update.chain","external-version-float"));
+ assertU(commit());
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a1'}}");
+ deleteAndGetVersion("aaa", params("del_version", "7",
+ "update.chain","external-version-float"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a1'}}");
+ assertU(commit());
+
+ // skip low version delete against uncommited doc from updateLog
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_f", "10.02")),
+ params("update.chain","external-version-float"));
+ deleteAndGetVersion("aaa", params("del_version", "8",
+ "update.chain","external-version-float"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+
+ // skip low version add against uncommited "delete" from updateLog
+ deleteAndGetVersion("aaa", params("del_version", "10.10",
+ "update.chain","external-version-float"));
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.05")),
+ params("update.chain","external-version-float"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+ , "=={'doc':{'my_version_f':10.10}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+ , "=={'doc':{'my_version_f':10.10}}");
+
+ // skip low version add against committed "delete"
+ // (delete was already done & committed above)
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.09")),
+ params("update.chain","external-version-float"));
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+ , "=={'doc':{'my_version_f':10.10}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
+ , "=={'doc':{'my_version_f':10.10}}");
+ }
+
+ public void testFailOnOldVersion() throws Exception {
+
+ // fail low version add & low version delete against commited doc
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_l", "1001")),
+ params("update.chain","external-version-failhard"));
+ assertU(commit());
+ try {
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "42")),
+ params("update.chain","external-version-failhard"));
+ fail("no 409");
+ } catch (SolrException ex) {
+ assertEquals(409, ex.code());
+ }
+ assertU(commit());
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a1'}}");
+ try {
+ deleteAndGetVersion("aaa", params("del_version", "7",
+ "update.chain","external-version-failhard"));
+ fail("no 409");
+ } catch (SolrException ex) {
+ assertEquals(409, ex.code());
+ }
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a1'}}");
+ assertU(commit());
+
+ // fail low version delete against uncommited doc from updateLog
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_l", "1002")),
+ params("update.chain","external-version-failhard"));
+ try {
+ deleteAndGetVersion("aaa", params("del_version", "8",
+ "update.chain","external-version-failhard"));
+ fail("no 409");
+ } catch (SolrException ex) {
+ assertEquals(409, ex.code());
+ }
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
+ assertJQ(req("qt","/get", "id","aaa", "fl","name")
+ , "=={'doc':{'name':'a2'}}");
+
+ // fail low version add against uncommited "delete" from updateLog
+ deleteAndGetVersion("aaa", params("del_version", "1010",
+ "update.chain","external-version-failhard"));
+ try {
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1005")),
+ params("update.chain","external-version-failhard"));
+ fail("no 409");
+ } catch (SolrException ex) {
+ assertEquals(409, ex.code());
+ }
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}");
+
+ // fail low version add against committed "delete"
+ // (delete was already done & committed above)
+ try {
+ updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1009")),
+ params("update.chain","external-version-failhard"));
+ fail("no 409");
+ } catch (SolrException ex) {
+ assertEquals(409, ex.code());
+ }
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}}");
+ assertU(commit());
+ assertJQ(req("q","+id:aaa"), "/response/numFound==1");
+ assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
+ assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
+ , "=={'doc':{'my_version_l':1010}}");
+ }
+
+
+ /**
+ * Proof of concept test demonstrating how to manage and periodically cleanup
+ * the "logically" deleted documents
+ */
+ public void testManagingDeletes() throws Exception {
+ // add some docs
+ for (long ver = 1010; ver < 1020; ver++) {
+ for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
+ assertU(adoc("id", id, "name", "name_"+id, "my_version_l", ""+ver));
+ }
+ }
+ assertU(adoc("id", "aaa", "name", "name_aaa", "my_version_l", "1030"));
+ assertU(commit());
+ // sample queries
+ assertJQ(req("q","*:*",
+ "fq","live_b:true")
+ ,"/response/numFound==4");
+ assertJQ(req("q","id:aaa",
+ "fq","live_b:true",
+ "fl","id,my_version_l")
+ ,"/response/numFound==1"
+ ,"/response/docs==[{'id':'aaa','my_version_l':1030}]}");
+ // logically delete
+ deleteAndGetVersion("aaa",
+ params("del_version", "1031"));
+ assertU(commit());
+ // sample queries
+ assertJQ(req("q","*:*",
+ "fq","live_b:true")
+ ,"/response/numFound==3");
+ assertJQ(req("q","id:aaa",
+ "fq","live_b:true")
+ ,"/response/numFound==0");
+ // placeholder doc is still in the index though
+ assertJQ(req("q","id:aaa",
+ "fq","live_b:false",
+ "fq", "timestamp_tdt:[* TO *]",
+ "fl","id,live_b,my_version_l")
+ ,"/response/numFound==1"
+ ,"/response/docs==[{'id':'aaa','my_version_l':1031,'live_b':false}]}");
+ // doc can't be re-added with a low version
+ assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "1025"));
+ assertU(commit());
+ assertJQ(req("q","id:aaa",
+ "fq","live_b:true")
+ ,"/response/numFound==0");
+
+ // "dead" placeholder docs can be periodically cleaned up
+ // ie: assertU(delQ("+live_b:false +timestamp_tdt:[* TO NOW/MINUTE-5MINUTE]"));
+ // but to prevent the test from ebing time sensitive we'll just purge them all
+ assertU(delQ("+live_b:false"));
+ assertU(commit());
+ // now doc can be re-added w/any version, no matter how low
+ assertU(adoc("id", "aaa", "name", "aaa", "my_version_l", "7"));
+ assertU(commit());
+ assertJQ(req("q","id:aaa",
+ "fq","live_b:true",
+ "fl","id,live_b,my_version_l")
+ ,"/response/numFound==1"
+ ,"/response/docs==[{'id':'aaa','my_version_l':7,'live_b':true}]}");
+
+ }
+
+ /**
+ * Constantly hammer the same doc with multiple concurrent threads and diff versions,
+ * confirm that the highest version wins.
+ */
+ public void testConcurrentAdds() throws Exception {
+ final int NUM_DOCS = atLeast(50);
+ final int MAX_CONCURENT = atLeast(10);
+ ExecutorService runner = Executors.newFixedThreadPool(MAX_CONCURENT);
+ // runner = Executors.newFixedThreadPool(1); // to test single threaded
+ try {
+ for (int id = 0; id < NUM_DOCS; id++) {
+ final int numAdds = _TestUtil.nextInt(random(),3,MAX_CONCURENT);
+ final int winner = _TestUtil.nextInt(random(),0,numAdds-1);
+ final int winnerVersion = atLeast(100);
+ final boolean winnerIsDeleted = (0 == _TestUtil.nextInt(random(),0,4));
+ List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(numAdds);
+ for (int variant = 0; variant < numAdds; variant++) {
+ final boolean iShouldWin = (variant==winner);
+ final long version = (iShouldWin ? winnerVersion
+ : _TestUtil.nextInt(random(),1,winnerVersion-1));
+ if ((iShouldWin && winnerIsDeleted)
+ || (!iShouldWin && 0 == _TestUtil.nextInt(random(),0,4))) {
+ tasks.add(delayedDelete(""+id, ""+version));
+ } else {
+ tasks.add(delayedAdd("id",""+id,"name","name"+id+"_"+variant,
+ "my_version_l", ""+ version));
+ }
+ }
+ runner.invokeAll(tasks);
+ final String expectedDoc = "{'id':'"+id+"','my_version_l':"+winnerVersion +
+ ( ! winnerIsDeleted ? ",'name':'name"+id+"_"+winner+"'}" : "}");
+
+ assertJQ(req("qt","/get", "id",""+id, "fl","id,name,my_version_l")
+ , "=={'doc':" + expectedDoc + "}");
+ assertU(commit());
+ assertJQ(req("q","id:"+id,
+ "fl","id,name,my_version_l")
+ ,"/response/numFound==1"
+ ,"/response/docs==["+expectedDoc+"]");
+ }
+ } finally {
+ runner.shutdownNow();
+ }
+ }
+
+ private Callable<Object> delayedAdd(final String... fields) {
+ return Executors.callable(new Runnable() {
+ public void run() {
+ // log.info("ADDING DOC: " + adoc(fields));
+ assertU(adoc(fields));
+ }
+ });
+ }
+ private Callable<Object> delayedDelete(final String id, final String externalVersion) {
+ return Executors.callable(new Runnable() {
+ public void run() {
+ try {
+ // Why does this throw "Exception" ???
+ // log.info("DELETING DOC: " + id + " v="+externalVersion);
+ deleteAndGetVersion(id, params("del_version", externalVersion));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+}