You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2017/01/26 07:34:35 UTC
[10/12] lucene-solr:apiv2: SOLR-5944: In-place updates of Numeric
DocValues
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c62a90a..8f5d909 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -36,7 +36,13 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
@@ -82,9 +88,11 @@ import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +106,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String DISTRIB_FROM_COLLECTION = "distrib.from.collection";
public static final String DISTRIB_FROM_PARENT = "distrib.from.parent";
public static final String DISTRIB_FROM = "distrib.from";
+ public static final String DISTRIB_INPLACE_PREVVERSION = "distrib.inplace.prevversion";
private static final String TEST_DISTRIB_SKIP_SERVERS = "test.distrib.skip.servers";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -726,7 +735,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
-
+
+ // If we were sent a previous version, set this to the AddUpdateCommand (if not already set)
+ if (!cmd.isInPlaceUpdate()) {
+ cmd.prevVersion = cmd.getReq().getParams().getLong(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, -1);
+ }
// TODO: if minRf > 1 and we know the leader is the only active replica, we could fail
// the request right here but for now I think it is better to just return the status
// to the client that the minRf wasn't reached and let them handle it
@@ -783,7 +796,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (replicationTracker != null && minRf > 1)
params.set(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
-
+
+ if (cmd.isInPlaceUpdate()) {
+ params.set(DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
+ }
cmdDistrib.distribAdd(cmd, nodes, params, false, replicationTracker);
}
@@ -1011,9 +1027,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
VersionBucket bucket = vinfo.bucket(bucketHash);
+ long dependentVersionFound = -1; // Last found version for a dependent update; applicable only for in-place updates; useful for logging later
+ // if this is an inplace update, check and wait if we should be waiting for a dependent update, before
+ // entering the synchronized block
+ if (!leaderLogic && cmd.isInPlaceUpdate()) {
+ dependentVersionFound = waitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket);
+ if (dependentVersionFound == -1) {
+ // it means in leader, the document has been deleted by now. drop this update
+ return true;
+ }
+ }
+
vinfo.lockForUpdate();
try {
synchronized (bucket) {
+ bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update
// we obtain the version when synchronized and then do the add so we can ensure that
// if version1 < version2 then version1 is actually added before version2.
@@ -1078,23 +1106,69 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return true;
}
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occurred.
- bucket.updateHighest(versionOnUpdate);
- } else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
+ if (cmd.isInPlaceUpdate()) {
+ long prev = cmd.prevVersion;
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
- log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
- return true;
+ if (lastVersion == null || Math.abs(lastVersion) < prev) {
+ // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
+ // So we shouldn't be here, unless what must've happened is:
+ // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
+ // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+ // from the deleted list (which might be older than the prev update!)
+ UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
+
+ if (fetchedFromLeader instanceof DeleteUpdateCommand) {
+ log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+ + " was deleted at the leader subsequently.", idBytes.utf8ToString());
+ versionDelete((DeleteUpdateCommand)fetchedFromLeader);
+ return true;
+ } else {
+ assert fetchedFromLeader instanceof AddUpdateCommand;
+ // Newer document was fetched from the leader. Apply that document instead of this current in-place update.
+ log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
+ idBytes.utf8ToString(), (fetchedFromLeader == null? null: ((AddUpdateCommand)fetchedFromLeader).solrDoc));
+
+ // Make this update to become a non-inplace update containing the full document obtained from the leader
+ cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc;
+ cmd.prevVersion = -1;
+ cmd.setVersion((long)cmd.solrDoc.getFieldValue(VERSION_FIELD));
+ assert cmd.isInPlaceUpdate() == false;
+ }
+ } else {
+ if (lastVersion != null && Math.abs(lastVersion) > prev) {
+ // this means we got a newer full doc update and in that case it makes no sense to apply the older
+ // inplace update. Drop this update
+ log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+ + ". Dropping current update.");
+ return true;
+ } else {
+ // We're good, we should apply this update. First, update the bucket's highest.
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ bucket.updateHighest(versionOnUpdate);
+ }
+ }
}
+ }
+
+ if (!cmd.isInPlaceUpdate()) {
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occurred.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ return true;
+ }
- // also need to re-apply newer deleteByQuery commands
- checkDeleteByQueries = true;
+ // also need to re-apply newer deleteByQuery commands
+ checkDeleteByQueries = true;
+ }
}
}
}
@@ -1120,11 +1194,161 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return false;
}
+ /**
+ * This method checks the update/transaction logs and index to find out if the update ("previous update") that the current update
+ * depends on (in the case that this current update is an in-place update) has already been completed. If not,
+ * this method will wait for the missing update until it has arrived. If it doesn't arrive within a timeout threshold,
+ * then this actively fetches from the leader.
+ *
+ * @return -1 if the current in-place should be dropped, or last found version if previous update has been indexed.
+ */
+ private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate,
+ boolean isReplayOrPeersync, VersionBucket bucket) throws IOException {
+ long lastFoundVersion = 0;
+ TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS);
+
+ vinfo.lockForUpdate();
+ try {
+ synchronized (bucket) {
+ Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+
+ if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
+ log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+ (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId());
+ }
+
+ while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+ try {
+ long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS);
+ if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that.
+ bucket.wait(timeLeft);
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+ }
+ }
+ } finally {
+ vinfo.unlockForUpdate();
+ }
+
+ if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
+ // This must've been the case due to a higher version full update succeeding concurrently, while we were waiting or
+ // trying to index this partial update. Since a full update more recent than this partial update has succeeded,
+ // we can drop the current update.
+ if (log.isDebugEnabled()) {
+ log.debug("Update was applied on version: {}, but last version I have is: {}"
+ + ". Current update should be dropped. id={}", cmd.prevVersion, lastFoundVersion, cmd.getPrintableId());
+ }
+ return -1;
+ } else if (Math.abs(lastFoundVersion) == cmd.prevVersion) {
+ assert 0 < lastFoundVersion : "prevVersion " + cmd.prevVersion + " found but is a delete!";
+ if (log.isDebugEnabled()) {
+ log.debug("Dependent update found. id={}", cmd.getPrintableId());
+ }
+ return lastFoundVersion;
+ }
+
+ // We have waited enough, but dependent update didn't arrive. Its time to actively fetch it from leader
+ log.info("Missing update, on which current in-place update depends on, hasn't arrived. id={}, looking for version={}, last found version={}",
+ cmd.getPrintableId(), cmd.prevVersion, lastFoundVersion);
+
+ UpdateCommand missingUpdate = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
+ if (missingUpdate instanceof DeleteUpdateCommand) {
+ log.info("Tried to fetch document {} from the leader, but the leader says document has been deleted. "
+ + "Deleting the document here and skipping this update: Last found version: {}, was looking for: {}", cmd.getPrintableId(), lastFoundVersion, cmd.prevVersion);
+ versionDelete((DeleteUpdateCommand)missingUpdate);
+ return -1;
+ } else {
+ assert missingUpdate instanceof AddUpdateCommand;
+ log.info("Fetched the document: {}", ((AddUpdateCommand)missingUpdate).getSolrInputDocument());
+ versionAdd((AddUpdateCommand)missingUpdate);
+ log.info("Added the fetched document, id="+((AddUpdateCommand)missingUpdate).getPrintableId()+", version="+missingUpdate.getVersion());
+ }
+ return missingUpdate.getVersion();
+ }
+
+ /**
+ * This method is used when an update on which a particular in-place update has been lost for some reason. This method
+ * sends a request to the shard leader to fetch the latest full document as seen on the leader.
+ * @return AddUpdateCommand containing latest full doc at shard leader for the given id, or null if not found.
+ */
+ private UpdateCommand fetchFullUpdateFromLeader(AddUpdateCommand inplaceAdd, long versionOnUpdate) throws IOException {
+ String id = inplaceAdd.getPrintableId();
+ UpdateShardHandler updateShardHandler = inplaceAdd.getReq().getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("distrib", false);
+ params.set("getInputDocument", id);
+ params.set("onlyIfActive", true);
+ SolrRequest<SimpleSolrResponse> ur = new GenericSolrRequest(METHOD.GET, "/get", params);
+
+ String leaderUrl = req.getParams().get(DISTRIB_FROM);
+
+ if (leaderUrl == null) {
+ // An update we're dependent upon didn't arrive! This is unexpected. Perhaps likely our leader is
+ // down or partitioned from us for some reason. Lets force refresh cluster state, and request the
+ // leader for the update.
+ if (zkController == null) { // we should be in cloud mode, but wtf? could be a unit test
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id + ", but fetching from leader "
+ + "failed since we're not in cloud mode.");
+ }
+ Replica leader;
+ try {
+ leader = zkController.getZkStateReader().getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception during fetching from leader.", e);
+ }
+ leaderUrl = leader.getCoreUrl();
+ }
+
+ HttpSolrClient hsc = new HttpSolrClient.Builder(leaderUrl).
+ withHttpClient(updateShardHandler.getHttpClient()).build();
+ NamedList rsp = null;
+ try {
+ rsp = hsc.request(ur);
+ } catch (SolrServerException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error during fetching [" + id +
+ "] from leader (" + leaderUrl + "): ", e);
+ } finally {
+ hsc.close();
+ }
+ Object inputDocObj = rsp.get("inputDocument");
+ Long version = (Long)rsp.get("version");
+ SolrInputDocument leaderDoc = (SolrInputDocument) inputDocObj;
+
+ if (leaderDoc == null) {
+ // this doc was not found (deleted) on the leader. Lets delete it here as well.
+ DeleteUpdateCommand del = new DeleteUpdateCommand(inplaceAdd.getReq());
+ del.setIndexedId(inplaceAdd.getIndexedId());
+ del.setId(inplaceAdd.getIndexedId().utf8ToString());
+ del.setVersion((version == null || version == 0)? -versionOnUpdate: version);
+ return del;
+ }
+
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = leaderDoc;
+ cmd.setVersion((long)leaderDoc.getFieldValue(VERSION_FIELD));
+ return cmd;
+ }
+
// TODO: may want to switch to using optimistic locking in the future for better concurrency
// that's why this code is here... need to retry in a loop closely around/in versionAdd
boolean getUpdatedDocument(AddUpdateCommand cmd, long versionOnUpdate) throws IOException {
if (!AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) return false;
+ Set<String> inPlaceUpdatedFields = AtomicUpdateDocumentMerger.computeInPlaceUpdatableFields(cmd);
+ if (inPlaceUpdatedFields.size() > 0) { // non-empty means this is suitable for in-place updates
+ if (docMerger.doInPlaceUpdateMerge(cmd, inPlaceUpdatedFields)) {
+ return true;
+ } else {
+ // in-place update failed, so fall through and re-try the same with a full atomic update
+ }
+ }
+
+ // full (non-inplace) atomic update
SolrInputDocument sdoc = cmd.getSolrInputDocument();
BytesRef id = cmd.getIndexedId();
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id);
@@ -1140,7 +1364,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} else {
oldDoc.remove(VERSION_FIELD);
}
-
+
cmd.solrDoc = docMerger.merge(sdoc, oldDoc);
return true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
index c21ea76..b089c94 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.java
@@ -261,7 +261,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
SolrInputDocument oldDoc = null;
if (useFieldCache) {
- oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
+ oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, null, null, true);
if (oldDoc == RealTimeGetComponent.DELETED) {
return true;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
index ec637a4..2c58410 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/SkipExistingDocumentsProcessorFactory.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Collections;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -183,7 +184,9 @@ public class SkipExistingDocumentsProcessorFactory extends UpdateRequestProcesso
boolean doesDocumentExist(BytesRef indexedDocId) throws IOException {
assert null != indexedDocId;
- SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
+ // we don't need any fields populated, we just need to know if the doc is in the tlog...
+ SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, null,
+ Collections.<String>emptySet(), false);
if (oldDoc == RealTimeGetComponent.DELETED) {
return false;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml b/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
new file mode 100644
index 0000000..4ed48f6
--- /dev/null
+++ b/solr/core/src/test-files/solr/collection1/conf/schema-inplace-updates.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<schema name="inplace-updates" version="1.6">
+
+ <uniqueKey>id</uniqueKey>
+ <field name="id" type="string" indexed="true" stored="true" docValues="true"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
+
+ <!-- specific schema fields for dv in-place updates -->
+ <field name="inplace_updatable_float" type="float" indexed="false" stored="false" docValues="true" />
+ <field name="inplace_updatable_int" type="int" indexed="false" stored="false" docValues="true" />
+
+ <field name="inplace_updatable_float_with_default"
+ type="float" indexed="false" stored="false" docValues="true" default="42.0"/>
+ <field name="inplace_updatable_int_with_default"
+ type="int" indexed="false" stored="false" docValues="true" default="666"/>
+
+ <!-- dynamic fields which *ONLY* use docValues so they can be updated in place -->
+ <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_f_dvo" multiValued="false" type="float" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_l_dvo" multiValued="false" type="long" docValues="true" indexed="false" stored="false"/>
+
+ <!-- dynamic fields that must *NOT* support in place updates -->
+ <dynamicField name="*_s" type="string" indexed="true" stored="true"/>
+ <dynamicField name="*_i" type="int" indexed="true" stored="true" docValues="true"/>
+ <dynamicField name="*_l" type="long" indexed="true" stored="true" docValues="true"/>
+
+ <!-- Copy fields -->
+
+ <!-- The id field has a non in-place updatable copy target, but in-place updates should still work. -->
+ <copyField source="id" dest="id_field_copy_that_does_not_support_in_place_update_s"/>
+
+ <!-- copyfield1: src and dest are both updatable -->
+ <field name="copyfield1_src__both_updatable" type="int" indexed="false" stored="false" docValues="true" />
+ <copyField source="copyfield1_src__both_updatable" dest="copyfield1_dest__both_updatable_i_dvo"/>
+
+ <!-- copyfield2: src is updatable but dest is not -->
+ <field name="copyfield2_src__only_src_updatable" type="int" indexed="false" stored="false" docValues="true" />
+ <copyField source="copyfield2_src__only_src_updatable" dest="copyfield2_dest__only_src_updatable_i"/>
+
+
+ <!-- cruft needed by the solrconfig used in our tests for startup, but not used in the tests -->
+ <field name="signatureField" type="string" indexed="true" stored="false"/>
+ <dynamicField name="*_sS" type="string" indexed="true" stored="true"/>
+
+
+ <fieldType name="string" class="solr.StrField" multiValued="false" indexed="false" stored="false" docValues="false" />
+ <fieldType name="long" class="solr.${solr.tests.longClassName}" multiValued="false" indexed="false" stored="false" docValues="false"/>
+ <fieldType name="float" class="solr.${solr.tests.floatClassName}" multiValued="false" indexed="false" stored="false" docValues="false"/>
+ <fieldType name="int" class="solr.${solr.tests.intClassName}" multiValued="false" indexed="false" stored="false" docValues="false"/>
+
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test-files/solr/collection1/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema.xml b/solr/core/src/test-files/solr/collection1/conf/schema.xml
index be1b6f5..aef6c4c 100644
--- a/solr/core/src/test-files/solr/collection1/conf/schema.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/schema.xml
@@ -572,6 +572,8 @@
<field name="timestamp" type="date" indexed="true" stored="true" docValues="true" default="NOW" multiValued="false"/>
<field name="multiDefault" type="string" indexed="true" stored="true" default="muLti-Default" multiValued="true"/>
<field name="intDefault" type="int" indexed="true" stored="true" default="42" multiValued="false"/>
+ <field name="intDvoDefault" type="int" indexed="false" stored="false" multiValued="false"
+ useDocValuesAsStored="true" docValues="true" default="42" />
<field name="intRemove" type="int" indexed="true" stored="true" multiValued="true"/>
<field name="dateRemove" type="date" indexed="true" stored="true" multiValued="true"/>
<field name="floatRemove" type="float" indexed="true" stored="true" multiValued="true"/>
@@ -580,7 +582,7 @@
<field name="tlong" type="tlong" indexed="true" stored="true"/>
- <field name="_version_" type="long" indexed="true" stored="true" multiValued="false"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true" multiValued="false" useDocValuesAsStored="true"/>
<field name="title_stringNoNorms" type="string" omitNorms="true" indexed="true" stored="true"/>
@@ -685,15 +687,15 @@
<dynamicField name="*_f1_dv" type="${solr.tests.floatClass:pfloat}" indexed="true" stored="true" docValues="true" multiValued="false"/>
<!-- Non-stored, DocValues=true -->
- <dynamicField name="*_i_dvo" multiValued="false" type="${solr.tests.intClass:pint}" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_i_dvo" multiValued="false" type="${solr.tests.intClass:pint}" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_d_dvo" multiValued="false" type="${solr.tests.doubleClass:pdouble}" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_d_dvo" multiValued="false" type="${solr.tests.doubleClass:pdouble}" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_s_dvo" multiValued="false" type="string" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_s_dvo" multiValued="false" type="string" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_ii_dvo" multiValued="true" type="int" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_ii_dvo" multiValued="true" type="int" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
- <dynamicField name="*_dd_dvo" multiValued="true" type="double" docValues="true" indexed="true" stored="false"
+ <dynamicField name="*_dd_dvo" multiValued="true" type="double" docValues="true" indexed="false" stored="false"
useDocValuesAsStored="true"/>
<!-- Non-stored, DocValues=true, useDocValuesAsStored=false -->
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test-files/solr/collection1/conf/schema15.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/schema15.xml b/solr/core/src/test-files/solr/collection1/conf/schema15.xml
index d545149..e2c14f0 100644
--- a/solr/core/src/test-files/solr/collection1/conf/schema15.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/schema15.xml
@@ -529,7 +529,7 @@
<field name="copyfield_source" type="string" indexed="true" stored="true" multiValued="true"/>
<!-- for versioning -->
- <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true"/>
<!-- points to the root document of a block of nested documents -->
<field name="_root_" type="string" indexed="true" stored="true"/>
@@ -545,6 +545,11 @@
<dynamicField name="tv_mv_*" type="text" indexed="true" stored="true" multiValued="true"
termVectors="true" termPositions="true" termOffsets="true"/>
+ <!-- for in-place updates -->
+ <dynamicField name="*_i_dvo" multiValued="false" type="int" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_f_dvo" multiValued="false" type="float" docValues="true" indexed="false" stored="false"/>
+ <dynamicField name="*_l_dvo" multiValued="false" type="long" docValues="true" indexed="false" stored="false"/>
+
<dynamicField name="*_mfacet" type="string" indexed="true" stored="false" multiValued="true"/>
<dynamicField name="*_sw" type="text_sw" indexed="true" stored="true" multiValued="true"/>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test-files/solr/collection1/conf/solrconfig-sortingmergepolicyfactory.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-sortingmergepolicyfactory.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-sortingmergepolicyfactory.xml
index a990719..3746827 100644
--- a/solr/core/src/test-files/solr/collection1/conf/solrconfig-sortingmergepolicyfactory.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-sortingmergepolicyfactory.xml
@@ -26,8 +26,9 @@
<mergePolicyFactory class="org.apache.solr.index.SortingMergePolicyFactory">
<str name="wrapped.prefix">in</str>
<str name="in.class">org.apache.solr.util.RandomForceMergePolicyFactory</str>
- <str name="sort">timestamp desc</str>
+ <str name="sort">timestamp_i_dvo desc</str>
</mergePolicyFactory>
+ <lockType>${solr.tests.lockType:single}</lockType>
</indexConfig>
<requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test/org/apache/solr/cloud/SegmentTerminateEarlyTestState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/SegmentTerminateEarlyTestState.java b/solr/core/src/test/org/apache/solr/cloud/SegmentTerminateEarlyTestState.java
index b3df9e7..3fe12ed 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SegmentTerminateEarlyTestState.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SegmentTerminateEarlyTestState.java
@@ -17,8 +17,6 @@
package org.apache.solr.cloud;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -37,9 +35,13 @@ import org.apache.solr.response.SolrQueryResponse;
class SegmentTerminateEarlyTestState {
final String keyField = "id";
- final String timestampField = "timestamp";
- final String oddField = "odd_l1"; // <dynamicField name="*_l1" type="long" indexed="true" stored="true" multiValued="false"/>
- final String quadField = "quad_l1"; // <dynamicField name="*_l1" type="long" indexed="true" stored="true" multiValued="false"/>
+
+ // for historic reasons, this is refered to as a "timestamp" field, but in actuallity is just an int
+ // value representing a number of "minutes" between 0-60.
+ // aka: I decided not to rename a million things while refactoring this test
+ public static final String timestampField = "timestamp_i_dvo";
+ public static final String oddField = "odd_l1"; // <dynamicField name="*_l1" type="long" indexed="true" stored="true" multiValued="false"/>
+ public static final String quadField = "quad_l1"; // <dynamicField name="*_l1" type="long" indexed="true" stored="true" multiValued="false"/>
final Set<Integer> minTimestampDocKeys = new HashSet<>();
final Set<Integer> maxTimestampDocKeys = new HashSet<>();
@@ -77,7 +79,7 @@ class SegmentTerminateEarlyTestState {
maxTimestampMM = new Integer(MM);
maxTimestampDocKeys.add(docKey);
}
- doc.setField(timestampField, ZonedDateTime.of(2016, 1, 1, 0, MM, 0, 0, ZoneOffset.UTC).toInstant().toString());
+ doc.setField(timestampField, (Integer)MM);
doc.setField(oddField, ""+(numDocs % 2));
doc.setField(quadField, ""+(numDocs % 4)+1);
cloudSolrClient.add(doc);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
index 016b63e..5e6283a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSegmentSorting.java
@@ -17,19 +17,26 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
-import org.apache.lucene.index.TieredMergePolicy;
-import org.apache.solr.SolrTestCaseJ4;
+import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.index.TieredMergePolicyFactory;
import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,78 +47,55 @@ public class TestSegmentSorting extends SolrCloudTestCase {
private static final int NUM_SERVERS = 5;
private static final int NUM_SHARDS = 2;
private static final int REPLICATION_FACTOR = 2;
-
+ private static final String configName = MethodHandles.lookup().lookupClass() + "_configSet";
+
@BeforeClass
public static void setupCluster() throws Exception {
- configureCluster(NUM_SERVERS).configure();
+ configureCluster(NUM_SERVERS)
+ .addConfig(configName, Paths.get(TEST_HOME(), "collection1", "conf"))
+ .configure();
}
-
+
+ @Rule public TestName testName = new TestName();
+
@After
public void ensureClusterEmpty() throws Exception {
cluster.deleteAllCollections();
cluster.getSolrClient().setDefaultCollection(null);
}
-
- private void createCollection(MiniSolrCloudCluster miniCluster, String collectionName, String createNodeSet, String asyncId,
- Boolean indexToPersist, Map<String,String> collectionProperties) throws Exception {
- String configName = "solrCloudCollectionConfig";
- miniCluster.uploadConfigSet(SolrTestCaseJ4.TEST_PATH().resolve("collection1").resolve("conf"), configName);
-
- final boolean persistIndex = (indexToPersist != null ? indexToPersist.booleanValue() : random().nextBoolean());
- if (collectionProperties == null) {
- collectionProperties = new HashMap<>();
- }
- collectionProperties.putIfAbsent(CoreDescriptor.CORE_CONFIG, "solrconfig-tlog.xml");
- collectionProperties.putIfAbsent("solr.tests.maxBufferedDocs", "100000");
- collectionProperties.putIfAbsent("solr.tests.ramBufferSizeMB", "100");
- // use non-test classes so RandomizedRunner isn't necessary
+
+ @Before
+ public void createCollection() throws Exception {
+
+ final String collectionName = testName.getMethodName();
+ final CloudSolrClient cloudSolrClient = cluster.getSolrClient();
+
+ final Map<String, String> collectionProperties = new HashMap<>();
+ collectionProperties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-sortingmergepolicyfactory.xml");
+
+ CollectionAdminRequest.Create cmd =
+ CollectionAdminRequest.createCollection(collectionName, configName,
+ NUM_SHARDS, REPLICATION_FACTOR)
+ .setProperties(collectionProperties);
+
if (random().nextBoolean()) {
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_MERGEPOLICY, TieredMergePolicy.class.getName());
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_USEMERGEPOLICY, "true");
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_USEMERGEPOLICYFACTORY, "false");
- } else {
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_MERGEPOLICYFACTORY, TieredMergePolicyFactory.class.getName());
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_USEMERGEPOLICYFACTORY, "true");
- collectionProperties.putIfAbsent(SolrTestCaseJ4.SYSTEM_PROPERTY_SOLR_TESTS_USEMERGEPOLICY, "false");
- }
- collectionProperties.putIfAbsent("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
- collectionProperties.putIfAbsent("solr.directoryFactory", (persistIndex ? "solr.StandardDirectoryFactory" : "solr.RAMDirectoryFactory"));
-
- if (asyncId == null) {
- CollectionAdminRequest.createCollection(collectionName, configName, NUM_SHARDS, REPLICATION_FACTOR)
- .setCreateNodeSet(createNodeSet)
- .setProperties(collectionProperties)
- .process(miniCluster.getSolrClient());
- }
- else {
- CollectionAdminRequest.createCollection(collectionName, configName, NUM_SHARDS, REPLICATION_FACTOR)
- .setCreateNodeSet(createNodeSet)
- .setProperties(collectionProperties)
- .processAndWait(miniCluster.getSolrClient(), 30);
+ assertTrue( cmd.process(cloudSolrClient).isSuccess() );
+ } else { // async
+ assertEquals(RequestStatusState.COMPLETED, cmd.processAndWait(cloudSolrClient, 30));
}
+
+ ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
+ AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
+
+ cloudSolrClient.setDefaultCollection(collectionName);
}
public void testSegmentTerminateEarly() throws Exception {
- final String collectionName = "testSegmentTerminateEarlyCollection";
-
final SegmentTerminateEarlyTestState tstes = new SegmentTerminateEarlyTestState(random());
-
final CloudSolrClient cloudSolrClient = cluster.getSolrClient();
- cloudSolrClient.setDefaultCollection(collectionName);
- // create collection
- {
- final String asyncId = (random().nextBoolean() ? null : "asyncId("+collectionName+".create)="+random().nextInt());
- final Map<String, String> collectionProperties = new HashMap<>();
- collectionProperties.put(CoreDescriptor.CORE_CONFIG, "solrconfig-sortingmergepolicyfactory.xml");
- createCollection(cluster, collectionName, null, asyncId, Boolean.TRUE, collectionProperties);
- }
-
- ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, zkStateReader, true, true, 330);
-
// add some documents, then optimize to get merged-sorted segments
tstes.addDocuments(cloudSolrClient, 10, 10, true);
@@ -130,4 +114,71 @@ public class TestSegmentSorting extends SolrCloudTestCase {
tstes.queryTimestampAscendingSegmentTerminateEarlyYes(cloudSolrClient); // uses a sort order that is _not_ compatible with the merge sort order
}
+
+ /**
+ * Verify that atomic updates against our (DVO) segment sort field doesn't cause errors.
+ * In this situation, the updates should *NOT* be done inplace, because that would
+ * break the index sorting
+ */
+ public void testAtomicUpdateOfSegmentSortField() throws Exception {
+
+ final CloudSolrClient cloudSolrClient = cluster.getSolrClient();
+ final String updateField = SegmentTerminateEarlyTestState.timestampField;
+
+ // sanity check that updateField is in fact a DocValues only field, meaning it
+ // would normally be eligable for inplace updates -- if it weren't also used for merge sorting
+ final Map<String,Object> schemaOpts
+ = new Field(updateField, params("includeDynamic", "true",
+ "showDefaults","true")).process(cloudSolrClient).getField();
+ assertEquals(true, schemaOpts.get("docValues"));
+ assertEquals(false, schemaOpts.get("indexed"));
+ assertEquals(false, schemaOpts.get("stored"));
+
+ // add some documents
+ final int numDocs = atLeast(1000);
+ for (int id = 1; id <= numDocs; id++) {
+ cloudSolrClient.add(sdoc("id", id, updateField, random().nextInt(60)));
+
+ }
+ cloudSolrClient.commit();
+
+ // do some random iterations of replacing docs, atomic updates against segment sort field, and commits
+ // (at this point we're just sanity checking no serious failures)
+ for (int iter = 0; iter < 20; iter++) {
+ final int iterSize = atLeast(20);
+ for (int i = 0; i < iterSize; i++) {
+ // replace
+ cloudSolrClient.add(sdoc("id", TestUtil.nextInt(random(), 1, numDocs),
+ updateField, random().nextInt(60)));
+ // atomic update
+ cloudSolrClient.add(sdoc("id", TestUtil.nextInt(random(), 1, numDocs),
+ updateField, map("set", random().nextInt(60))));
+ }
+ cloudSolrClient.commit();
+ }
+
+
+ // pick a random doc, and verify that doing an atomic update causes the docid to change
+ // ie: not an inplace update
+ final int id = TestUtil.nextInt(random(), 1, numDocs);
+ final int oldDocId = (Integer) cloudSolrClient.getById(""+id, params("fl","[docid]")).get("[docid]");
+
+ cloudSolrClient.add(sdoc("id", id, updateField, map("inc","666")));
+ cloudSolrClient.commit();
+
+ // loop incase we're waiting for a newSearcher to be opened
+ int newDocId = -1;
+ int attempts = 10;
+ while ((newDocId < 0) && (0 < attempts--)) {
+ SolrDocumentList docs = cloudSolrClient.query(params("q", "id:"+id,
+ "fl","[docid]",
+ "fq", updateField + "[666 TO *]")).getResults();
+ if (0 < docs.size()) {
+ newDocId = (Integer)docs.get(0).get("[docid]");
+ } else {
+ Thread.sleep(50);
+ }
+ }
+ assertTrue(oldDocId != newDocId);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java b/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
new file mode 100644
index 0000000..9f371d4
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestStressInPlaceUpdates.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.math3.primes.Primes;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Slow
+public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void beforeSuperClass() throws Exception {
+ System.setProperty("solr.tests.intClassName", random().nextBoolean()? "TrieIntField": "IntPointField");
+ System.setProperty("solr.tests.longClassName", random().nextBoolean()? "TrieLongField": "LongPointField");
+ System.setProperty("solr.tests.floatClassName", random().nextBoolean()? "TrieFloatField": "FloatPointField");
+ System.setProperty("solr.tests.doubleClassName", random().nextBoolean()? "TrieDoubleField": "DoublePointField");
+
+ schemaString = "schema-inplace-updates.xml";
+ configString = "solrconfig-tlog.xml";
+
+ // sanity check that autocommits are disabled
+ initCore(configString, schemaString);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxTime);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoCommmitMaxDocs);
+ assertEquals(-1, h.getCore().getSolrConfig().getUpdateHandlerInfo().autoSoftCommmitMaxDocs);
+ }
+
+ @After
+ public void after() {
+ System.clearProperty("solr.tests.intClassName");
+ System.clearProperty("solr.tests.longClassName");
+ System.clearProperty("solr.tests.floatClassName");
+ System.clearProperty("solr.tests.doubleClassName");
+ }
+
+ public TestStressInPlaceUpdates() {
+ super();
+ sliceCount = 1;
+ fixShardCount(3);
+ }
+
+ protected final ConcurrentHashMap<Integer, DocInfo> model = new ConcurrentHashMap<>();
+ protected Map<Integer, DocInfo> committedModel = new HashMap<>();
+ protected long snapshotCount;
+ protected long committedModelClock;
+ protected int clientIndexUsedForCommit;
+ protected volatile int lastId;
+ protected final String field = "val_l";
+
+ private void initModel(int ndocs) {
+ for (int i = 0; i < ndocs; i++) {
+ // seed versions w/-1 so "from scratch" adds/updates will fail optimistic concurrency checks
+ // if some other thread beats us to adding the id
+ model.put(i, new DocInfo(-1L, 0, 0));
+ }
+ committedModel.putAll(model);
+ }
+
+ SolrClient leaderClient = null;
+
+ @Test
+ @ShardsFixed(num = 3)
+ public void stressTest() throws Exception {
+ waitForRecoveriesToFinish(true);
+
+ this.leaderClient = getClientForLeader();
+ assertNotNull("Couldn't obtain client for the leader of the shard", this.leaderClient);
+
+ final int commitPercent = 5 + random().nextInt(20);
+ final int softCommitPercent = 30 + random().nextInt(75); // what percent of the commits are soft
+ final int deletePercent = 4 + random().nextInt(25);
+ final int deleteByQueryPercent = random().nextInt(8);
+ final int ndocs = atLeast(5);
+ int nWriteThreads = 5 + random().nextInt(25);
+ int fullUpdatePercent = 5 + random().nextInt(50);
+
+ // query variables
+ final int percentRealtimeQuery = 75;
+ // number of cumulative read/write operations by all threads
+ final AtomicLong operations = new AtomicLong(25000);
+ int nReadThreads = 5 + random().nextInt(25);
+
+
+ /** // testing
+ final int commitPercent = 5;
+ final int softCommitPercent = 100; // what percent of the commits are soft
+ final int deletePercent = 0;
+ final int deleteByQueryPercent = 50;
+ final int ndocs = 10;
+ int nWriteThreads = 10;
+
+ final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
+
+ // query variables
+ final int percentRealtimeQuery = 101;
+ final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
+ int nReadThreads = 10;
+
+ int fullUpdatePercent = 20;
+ **/
+
+ log.info("{}", Arrays.asList
+ ("commitPercent", commitPercent, "softCommitPercent", softCommitPercent,
+ "deletePercent", deletePercent, "deleteByQueryPercent", deleteByQueryPercent,
+ "ndocs", ndocs, "nWriteThreads", nWriteThreads, "percentRealtimeQuery", percentRealtimeQuery,
+ "operations", operations, "nReadThreads", nReadThreads));
+
+ initModel(ndocs);
+
+ List<Thread> threads = new ArrayList<>();
+
+ for (int i = 0; i < nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER" + i) {
+ Random rand = new Random(random().nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ Map<Integer, DocInfo> newCommittedModel;
+ long version;
+
+ synchronized (TestStressInPlaceUpdates.this) {
+ // take a snapshot of the model
+ // this is safe to do w/o synchronizing on the model because it's a ConcurrentHashMap
+ newCommittedModel = new HashMap<>(model);
+ version = snapshotCount++;
+
+ int chosenClientIndex = rand.nextInt(clients.size());
+
+ if (rand.nextInt(100) < softCommitPercent) {
+ log.info("softCommit start");
+ clients.get(chosenClientIndex).commit(true, true, true);
+ log.info("softCommit end");
+ } else {
+ log.info("hardCommit start");
+ clients.get(chosenClientIndex).commit();
+ log.info("hardCommit end");
+ }
+
+ // install this model snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ log.info("installing new committedModel version={}", committedModelClock);
+ }
+ clientIndexUsedForCommit = chosenClientIndex;
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ }
+ }
+ continue;
+ }
+
+ int id;
+
+ if (rand.nextBoolean()) {
+ id = rand.nextInt(ndocs);
+ } else {
+ id = lastId; // reuse the last ID half of the time to force more race conditions
+ }
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = rand.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ DocInfo info = model.get(id);
+
+ // yield after getting the next version to increase the odds of updates happening out of order
+ if (rand.nextBoolean()) Thread.yield();
+
+ if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ final boolean dbq = (oper >= commitPercent + deletePercent);
+ final String delType = dbq ? "DBI": "DBQ";
+ log.info("{} id {}: {}", delType, id, info);
+
+ Long returnedVersion = null;
+
+ try {
+ returnedVersion = deleteDocAndGetVersion(Integer.toString(id), params("_version_", Long.toString(info.version)), dbq);
+ log.info(delType + ": Deleting id=" + id + ", version=" + info.version
+ + ". Returned version=" + returnedVersion);
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during {}, rejected id={}, {}", delType, id, e);
+ returnedVersion = null;
+ } else {
+ throw e;
+ }
+ }
+
+ // only update model if update had no conflict & the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (null != returnedVersion &&
+ (Math.abs(returnedVersion.longValue()) > Math.abs(currInfo.version))) {
+ model.put(id, new DocInfo(returnedVersion.longValue(), 0, 0));
+ }
+ }
+
+ } else {
+ int val1 = info.intFieldValue;
+ long val2 = info.longFieldValue;
+ int nextVal1 = val1;
+ long nextVal2 = val2;
+
+ int addOper = rand.nextInt(100);
+ Long returnedVersion;
+ if (addOper < fullUpdatePercent || info.version <= 0) { // if document was never indexed or was deleted
+ // FULL UPDATE
+ nextVal1 = Primes.nextPrime(val1 + 1);
+ nextVal2 = nextVal1 * 1000000000l;
+ try {
+ returnedVersion = addDocAndGetVersion("id", id, "title_s", "title" + id, "val1_i_dvo", nextVal1, "val2_l_dvo", nextVal2, "_version_", info.version);
+ log.info("FULL: Writing id=" + id + ", val=[" + nextVal1 + "," + nextVal2 + "], version=" + info.version + ", Prev was=[" + val1 + "," + val2 + "]. Returned version=" + returnedVersion);
+
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during full update, rejected id={}, {}", id, e);
+ returnedVersion = null;
+ } else {
+ throw e;
+ }
+ }
+ } else {
+ // PARTIAL
+ nextVal2 = val2 + val1;
+ try {
+ returnedVersion = addDocAndGetVersion("id", id, "val2_l_dvo", map("inc", String.valueOf(val1)), "_version_", info.version);
+ log.info("PARTIAL: Writing id=" + id + ", val=[" + nextVal1 + "," + nextVal2 + "], version=" + info.version + ", Prev was=[" + val1 + "," + val2 + "]. Returned version=" + returnedVersion);
+ } catch (RuntimeException e) {
+ if (e.getMessage() != null && e.getMessage().contains("version conflict")
+ || e.getMessage() != null && e.getMessage().contains("Conflict")) {
+ // Its okay for a leader to reject a concurrent request
+ log.warn("Conflict during partial update, rejected id={}, {}", id, e);
+ } else if (e.getMessage() != null && e.getMessage().contains("Document not found for update.")
+ && e.getMessage().contains("id="+id)) {
+ log.warn("Attempted a partial update for a recently deleted document, rejected id={}, {}", id, e);
+ } else {
+ throw e;
+ }
+ returnedVersion = null;
+ }
+ }
+
+ // only update model if update had no conflict & the version is newer
+ synchronized (model) {
+ DocInfo currInfo = model.get(id);
+ if (null != returnedVersion &&
+ (Math.abs(returnedVersion.longValue()) > Math.abs(currInfo.version))) {
+ model.put(id, new DocInfo(returnedVersion.longValue(), nextVal1, nextVal2));
+ }
+
+ }
+ }
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+
+ }
+
+ // Read threads
+ for (int i = 0; i < nReadThreads; i++) {
+ Thread thread = new Thread("READER" + i) {
+ Random rand = new Random(random().nextInt());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
+ DocInfo expected;
+
+ if (realTime) {
+ expected = model.get(id);
+ } else {
+ synchronized (TestStressInPlaceUpdates.this) {
+ expected = committedModel.get(id);
+ }
+ }
+
+ if (VERBOSE) {
+ log.info("querying id {}", id);
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ if (realTime) {
+ params.set("wt", "json");
+ params.set("qt", "/get");
+ params.set("ids", Integer.toString(id));
+ } else {
+ params.set("wt", "json");
+ params.set("q", "id:" + Integer.toString(id));
+ params.set("omitHeader", "true");
+ }
+
+ int clientId = rand.nextInt(clients.size());
+ if (!realTime) clientId = clientIndexUsedForCommit;
+
+ QueryResponse response = clients.get(clientId).query(params);
+ if (response.getResults().size() == 0) {
+ // there's no info we can get back with a delete, so not much we can check without further synchronization
+ } else if (response.getResults().size() == 1) {
+ final SolrDocument actual = response.getResults().get(0);
+ final String msg = "Realtime=" + realTime + ", expected=" + expected + ", actual=" + actual;
+ assertNotNull(msg, actual);
+
+ final Long foundVersion = (Long) actual.getFieldValue("_version_");
+ assertNotNull(msg, foundVersion);
+ assertTrue(msg + "... solr doc has non-positive version???",
+ 0 < foundVersion.longValue());
+ final Integer intVal = (Integer) actual.getFieldValue("val1_i_dvo");
+ assertNotNull(msg, intVal);
+
+ final Long longVal = (Long) actual.getFieldValue("val2_l_dvo");
+ assertNotNull(msg, longVal);
+
+ assertTrue(msg + " ...solr returned older version then model. " +
+ "should not be possible given the order of operations in writer threads",
+ Math.abs(expected.version) <= foundVersion.longValue());
+
+ if (foundVersion.longValue() == expected.version) {
+ assertEquals(msg, expected.intFieldValue, intVal.intValue());
+ assertEquals(msg, expected.longFieldValue, longVal.longValue());
+ }
+
+ // Some things we can assert about any Doc returned from solr,
+ // even if it's newer then our (expected) model information...
+
+ assertTrue(msg + " ...how did a doc in solr get a non positive intVal?",
+ 0 < intVal);
+ assertTrue(msg + " ...how did a doc in solr get a non positive longVal?",
+ 0 < longVal);
+ assertEquals(msg + " ...intVal and longVal in solr doc are internally (modulo) inconsistent w/eachother",
+ 0, (longVal % intVal));
+
+ // NOTE: when foundVersion is greater then the version read from the model,
+ // it's not possible to make any assertions about the field values in solr relative to the
+ // field values in the model -- ie: we can *NOT* assert expected.longFieldVal <= doc.longVal
+ //
+ // it's tempting to think that this would be possible if we changed our model to preserve the
+ // "old" valuess when doing a delete, but that's still no garuntee because of how oportunistic
+ // concurrency works with negative versions: When adding a doc, we can assert that it must not
+ // exist with version<0, but we can't assert that the *reason* it doesn't exist was because of
+ // a delete with the specific version of "-42".
+ // So a wrtier thread might (1) prep to add a doc for the first time with "intValue=1,_version_=-1",
+ // and that add may succeed and (2) return some version X which is put in the model. but
+ // inbetween #1 and #2 other threads may have added & deleted the doc repeatedly, updating
+ // the model with intValue=7,_version_=-42, and a reader thread might meanwhile read from the
+ // model before #2 and expect intValue=5, but get intValue=1 from solr (with a greater version)
+
+ } else {
+ fail(String.format(Locale.ENGLISH, "There were more than one result: {}", response));
+ }
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ log.error("", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ { // final pass over uncommitted model with RTG
+
+ for (SolrClient client : clients) {
+ for (Map.Entry<Integer,DocInfo> entry : model.entrySet()) {
+ final Integer id = entry.getKey();
+ final DocInfo expected = entry.getValue();
+ final SolrDocument actual = client.getById(id.toString());
+
+ String msg = "RTG: " + id + "=" + expected;
+ if (null == actual) {
+ // a deleted or non-existent document
+ // sanity check of the model agrees...
+ assertTrue(msg + " is deleted/non-existent in Solr, but model has non-neg version",
+ expected.version < 0);
+ assertEquals(msg + " is deleted/non-existent in Solr", expected.intFieldValue, 0);
+ assertEquals(msg + " is deleted/non-existent in Solr", expected.longFieldValue, 0);
+ } else {
+ msg = msg + " <==VS==> " + actual;
+ assertEquals(msg, expected.intFieldValue, actual.getFieldValue("val1_i_dvo"));
+ assertEquals(msg, expected.longFieldValue, actual.getFieldValue("val2_l_dvo"));
+ assertEquals(msg, expected.version, actual.getFieldValue("_version_"));
+ assertTrue(msg + " doc exists in solr, but version is negative???",
+ 0 < expected.version);
+ }
+ }
+ }
+ }
+
+ { // do a final search and compare every result with the model
+
+ // because commits don't provide any sort of concrete versioning (or optimistic concurrency constraints)
+ // there's no way to garuntee that our committedModel matches what was in Solr at the time of the last commit.
+ // It's possible other threads made additional writes to solr before the commit was processed, but after
+ // the committedModel variable was assigned it's new value.
+ //
+ // what we can do however, is commit all completed updates, and *then* compare solr search results
+ // against the (new) committed model....
+
+ waitForThingsToLevelOut(30); // NOTE: this does an automatic commit for us & ensures replicas are up to date
+ committedModel = new HashMap<>(model);
+
+ // first, prune the model of any docs that have negative versions
+ // ie: were never actually added, or were ultimately deleted.
+ for (int i = 0; i < ndocs; i++) {
+ DocInfo info = committedModel.get(i);
+ if (info.version < 0) {
+ // first, a quick sanity check of the model itself...
+ assertEquals("Inconsistent int value in model for deleted doc" + i + "=" + info,
+ 0, info.intFieldValue);
+ assertEquals("Inconsistent long value in model for deleted doc" + i + "=" + info,
+ 0L, info.longFieldValue);
+
+ committedModel.remove(i);
+ }
+ }
+
+ for (SolrClient client : clients) {
+ QueryResponse rsp = client.query(params("q","*:*", "sort", "id asc", "rows", ndocs+""));
+ for (SolrDocument actual : rsp.getResults()) {
+ final Integer id = Integer.parseInt(actual.getFieldValue("id").toString());
+ final DocInfo expected = committedModel.get(id);
+
+ assertNotNull("Doc found but missing/deleted from model: " + actual, expected);
+
+ final String msg = "Search: " + id + "=" + expected + " <==VS==> " + actual;
+ assertEquals(msg, expected.intFieldValue, actual.getFieldValue("val1_i_dvo"));
+ assertEquals(msg, expected.longFieldValue, actual.getFieldValue("val2_l_dvo"));
+ assertEquals(msg, expected.version, actual.getFieldValue("_version_"));
+ assertTrue(msg + " doc exists in solr, but version is negative???",
+ 0 < expected.version);
+
+ // also sanity check the model (which we already know matches the doc)
+ assertEquals("Inconsistent (modulo) values in model for id " + id + "=" + expected,
+ 0, (expected.longFieldValue % expected.intFieldValue));
+ }
+ assertEquals(committedModel.size(), rsp.getResults().getNumFound());
+ }
+ }
+ }
+
+ /**
+ * Used for storing the info for a document in an in-memory model.
+ */
+ private static class DocInfo {
+ long version;
+ int intFieldValue;
+ long longFieldValue;
+
+ public DocInfo(long version, int val1, long val2) {
+ assert version != 0; // must either be real positive version, or negative deleted version/indicator
+ this.version = version;
+ this.intFieldValue = val1;
+ this.longFieldValue = val2;
+ }
+
+ @Override
+ public String toString() {
+ return "[version=" + version + ", intValue=" + intFieldValue + ",longValue=" + longFieldValue + "]";
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected long addDocAndGetVersion(Object... fields) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ addFields(doc, fields);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("versions", "true");
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(params);
+ ureq.add(doc);
+ UpdateResponse resp;
+
+ // send updates to leader, to avoid SOLR-8733
+ resp = ureq.process(leaderClient);
+
+ long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get("adds")).getVal(0).toString());
+ assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ + " worked around that problem here.", returnedVersion > 0);
+ return returnedVersion;
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected long deleteDocAndGetVersion(String id, ModifiableSolrParams params, boolean deleteByQuery) throws Exception {
+ params.add("versions", "true");
+
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParams(params);
+ if (deleteByQuery) {
+ ureq.deleteByQuery("id:"+id);
+ } else {
+ ureq.deleteById(id);
+ }
+ UpdateResponse resp;
+ // send updates to leader, to avoid SOLR-8733
+ resp = ureq.process(leaderClient);
+
+ String key = deleteByQuery? "deleteByQuery": "deletes";
+ long returnedVersion = Long.parseLong(((NamedList) resp.getResponse().get(key)).getVal(0).toString());
+ assertTrue("Due to SOLR-8733, sometimes returned version is 0. Let us assert that we have successfully"
+ + " worked around that problem here.", returnedVersion < 0);
+ return returnedVersion;
+ }
+
+ /**
+ * Method gets the SolrClient for the leader replica. This is needed for a workaround for SOLR-8733.
+ */
+ public SolrClient getClientForLeader() throws KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ Replica leader = null;
+ Slice shard1 = clusterState.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1);
+ leader = shard1.getLeader();
+
+ for (int i = 0; i < clients.size(); i++) {
+ String leaderBaseUrl = zkStateReader.getBaseUrlForNodeName(leader.getNodeName());
+ if (((HttpSolrClient) clients.get(i)).getBaseURL().startsWith(leaderBaseUrl))
+ return clients.get(i);
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/53754108/solr/core/src/test/org/apache/solr/search/TestRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
index 15aed5d..29efa52 100644
--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java
+++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java
@@ -25,9 +25,14 @@ import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import org.apache.solr.metrics.SolrMetricManager;
import org.noggit.ObjectBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateHandler;
@@ -37,6 +42,7 @@ import org.junit.Test;
import java.io.File;
import java.io.RandomAccessFile;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayDeque;
@@ -53,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
public class TestRecovery extends SolrTestCaseJ4 {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// means that we've seen the leader and have version info (i.e. we are a non-leader replica)
private static String FROM_LEADER = DistribPhase.FROMLEADER.toString();
@@ -67,6 +74,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
savedFactory = System.getProperty("solr.DirectoryFactory");
System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockFSDirectoryFactory");
initCore("solrconfig-tlog.xml","schema15.xml");
+
+ // validate that the schema was not changed to an unexpected state
+ IndexSchema schema = h.getCore().getLatestSchema();
+ assertTrue(schema.getFieldOrNull("_version_").hasDocValues() && !schema.getFieldOrNull("_version_").indexed()
+ && !schema.getFieldOrNull("_version_").stored());
+
}
@AfterClass
@@ -86,6 +99,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
@Test
public void testLogReplay() throws Exception {
+
try {
DirectUpdateHandler2.commitOnClose = false;
@@ -112,7 +126,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
versions.addFirst(addAndGetVersion(sdoc("id", "A12"), null));
versions.addFirst(deleteByQueryAndGetVersion("id:A11", null));
versions.addFirst(addAndGetVersion(sdoc("id", "A13"), null));
-
+ versions.addFirst(addAndGetVersion(sdoc("id", "A12", "val_i_dvo", map("set", 1)), null)); // atomic update
+ versions.addFirst(addAndGetVersion(sdoc("id", "A12", "val_i_dvo", map("set", 2)), null)); // in-place update
assertJQ(req("q","*:*"),"/response/numFound==0");
assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
@@ -151,10 +166,11 @@ public class TestRecovery extends SolrTestCaseJ4 {
// wait until recovery has finished
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
assertJQ(req("q","*:*") ,"/response/numFound==3");
- assertEquals(5L, replayDocs.getCount() - initialOps);
+ assertEquals(7L, replayDocs.getCount() - initialOps);
assertEquals(UpdateLog.State.ACTIVE.ordinal(), state.getValue().intValue());
// make sure we can still access versions after recovery
@@ -166,6 +182,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertU(adoc("id","A4"));
assertJQ(req("q","*:*") ,"/response/numFound==3");
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
h.close();
createCore();
@@ -185,6 +202,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
assertJQ(req("q","*:*") ,"/response/numFound==5");
+ assertJQ(req("q","val_i_dvo:2") ,"/response/numFound==1"); // assert that in-place update is retained
Thread.sleep(100);
assertEquals(permits, logReplay.availablePermits()); // no updates, so insure that recovery didn't run
@@ -1258,6 +1276,133 @@ public class TestRecovery extends SolrTestCaseJ4 {
}
}
+ @Test
+ public void testLogReplayWithInPlaceUpdatesAndDeletes() throws Exception {
+
+ try {
+
+ DirectUpdateHandler2.commitOnClose = false;
+ final Semaphore logReplay = new Semaphore(0);
+ final Semaphore logReplayFinish = new Semaphore(0);
+
+ UpdateLog.testing_logReplayHook = () -> {
+ try {
+ assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
+
+
+ clearIndex();
+ assertU(commit());
+
+ Deque<Long> versions = new ArrayDeque<>();
+ versions.addFirst(addAndGetVersion(sdoc("id", "A1"), null));
+
+ // DBQ of updated document using id
+ versions.addFirst(addAndGetVersion(sdoc("id", "A2", "val_i_dvo", "1"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A2", "val_i_dvo", map("set", 2)), null)); // in-place update
+ versions.addFirst(deleteByQueryAndGetVersion("id:A2", null));
+
+ // DBQ of updated document using updated value
+ versions.addFirst(addAndGetVersion(sdoc("id", "A3", "val_i_dvo", "101"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A3", "val_i_dvo", map("set", 102)), null)); // in-place update
+ versions.addFirst(deleteByQueryAndGetVersion("val_i_dvo:102", null));
+
+ // DBQ using an intermediate update value (shouldn't delete anything)
+ versions.addFirst(addAndGetVersion(sdoc("id", "A4", "val_i_dvo", "200"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A4", "val_i_dvo", map("inc", "1")), null)); // in-place update
+ versions.addFirst(addAndGetVersion(sdoc("id", "A4", "val_i_dvo", map("inc", "1")), null)); // in-place update
+ versions.addFirst(deleteByQueryAndGetVersion("val_i_dvo:201", null));
+
+ // DBI of updated document
+ versions.addFirst(addAndGetVersion(sdoc("id", "A5", "val_i_dvo", "300"), null));
+ versions.addFirst(addAndGetVersion(sdoc("id", "A5", "val_i_dvo", map("inc", "1")), null)); // in-place update
+ versions.addFirst(addAndGetVersion(sdoc("id", "A5", "val_i_dvo", map("inc", "1")), null)); // in-place update
+ versions.addFirst(deleteAndGetVersion("A5", null));
+
+ assertJQ(req("q","*:*"),"/response/numFound==0");
+
+
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
+
+ h.close();
+ createCore();
+
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ // verify that previous close didn't do a commit
+ // recovery should be blocked by our hook
+ assertJQ(req("q","*:*") ,"/response/numFound==0");
+
+ // make sure we can still access versions after a restart
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
+
+ // unblock recovery
+ logReplay.release(1000);
+
+ // make sure we can still access versions during recovery
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
+
+ // wait until recovery has finished
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertJQ(req("q","val_i_dvo:202") ,"/response/numFound==1"); // assert that in-place update is retained
+
+ assertJQ(req("q","*:*") ,"/response/numFound==2");
+ assertJQ(req("q","id:A2") ,"/response/numFound==0");
+ assertJQ(req("q","id:A3") ,"/response/numFound==0");
+ assertJQ(req("q","id:A4") ,"/response/numFound==1");
+ assertJQ(req("q","id:A5") ,"/response/numFound==0");
+
+ // make sure we can still access versions after recovery
+ assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
+
+ assertU(adoc("id","A10"));
+
+ h.close();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ // wait until recovery has finished
+ assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
+ assertJQ(req("q","*:*") ,"/response/numFound==3");
+ assertJQ(req("q","id:A2") ,"/response/numFound==0");
+ assertJQ(req("q","id:A3") ,"/response/numFound==0");
+ assertJQ(req("q","id:A4") ,"/response/numFound==1");
+ assertJQ(req("q","id:A5") ,"/response/numFound==0");
+ assertJQ(req("q","id:A10"),"/response/numFound==1");
+
+ // no updates, so insure that recovery does not run
+ h.close();
+ int permits = logReplay.availablePermits();
+ createCore();
+ // Solr should kick this off now
+ // h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
+
+ assertJQ(req("q","*:*") ,"/response/numFound==3");
+ assertJQ(req("q","val_i_dvo:202") ,"/response/numFound==1"); // assert that in-place update is retained
+ assertJQ(req("q","id:A2") ,"/response/numFound==0");
+ assertJQ(req("q","id:A3") ,"/response/numFound==0");
+ assertJQ(req("q","id:A4") ,"/response/numFound==1");
+ assertJQ(req("q","id:A5") ,"/response/numFound==0");
+ assertJQ(req("q","id:A10"),"/response/numFound==1");
+ Thread.sleep(100);
+ assertEquals(permits, logReplay.availablePermits()); // no updates, so insure that recovery didn't run
+
+ assertEquals(UpdateLog.State.ACTIVE, h.getCore().getUpdateHandler().getUpdateLog().getState());
+
+ } finally {
+ DirectUpdateHandler2.commitOnClose = true;
+ UpdateLog.testing_logReplayHook = null;
+ UpdateLog.testing_logReplayFinishHook = null;
+ }
+
+ }
// NOTE: replacement must currently be same size
private static void findReplace(byte[] from, byte[] to, byte[] data) {