You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/04/30 20:31:54 UTC
[lucene-solr] branch master updated: SOLR-12833: Avoid unnecessary
memory cost when DistributedUpdateProcessor timed-out lock is not used.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 70e0907 SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
70e0907 is described below
commit 70e090717d21a9713c45b8a52e35df940a4277df
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Apr 30 22:31:06 2019 +0200
SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
---
solr/CHANGES.txt | 3 +
...{VersionBucket.java => TimedVersionBucket.java} | 56 +--
.../java/org/apache/solr/update/VersionBucket.java | 43 +-
.../java/org/apache/solr/update/VersionInfo.java | 19 +-
.../processor/DistributedUpdateProcessor.java | 480 +++++++++++----------
.../processor/DistributedUpdateProcessorTest.java | 163 ++++++-
.../src/updatehandlers-in-solrconfig.adoc | 24 ++
7 files changed, 483 insertions(+), 305 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ab5e588..e8d455b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -266,6 +266,9 @@ Improvements
* SOLR-13432: Add .toString methods to BitDocSet and SortedIntDocSet so that enabling "showItems" on the filter caches
shows some useful information about the values in the cache. (shalin)
+* SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
+ (jefferyyuan, ab)
+
Other Changes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
similarity index 67%
copy from solr/core/src/java/org/apache/solr/update/VersionBucket.java
copy to solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index 066936a..b8af912 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -16,44 +16,35 @@
*/
package org.apache.solr.update;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-// TODO: make inner?
-// TODO: store the highest possible in the index on a commit (but how to not block adds?)
-// TODO: could also store highest possible in the transaction log after a commit.
-// Or on a new index, just scan "version" for the max?
-/** @lucene.internal */
-public class VersionBucket {
- private int lockTimeoutMs;
+import org.apache.solr.common.SolrException;
- public VersionBucket(int lockTimeoutMs) {
- this.lockTimeoutMs = lockTimeoutMs;
- }
+/** @lucene.internal */
+/**
+ * This implementation uses lock and condition and will throw exception if it can't obtain the lock within
+ * <code>lockTimeoutMs</code>.
+ */
+public class TimedVersionBucket extends VersionBucket {
private final Lock lock = new ReentrantLock(true);
private final Condition condition = lock.newCondition();
- public long highest;
-
- public void updateHighest(long val) {
- if (highest != 0) {
- highest = Math.max(highest, Math.abs(val));
- }
- }
-
- public int getLockTimeoutMs() {
- return lockTimeoutMs;
- }
-
- public boolean tryLock() {
- try {
- return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ /**
+ * This will run the function with the lock. It will throw exception if it can't obtain the lock within
+ * <code>lockTimeoutMs</code>.
+ */
+ @Override
+ public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+ if (tryLock(lockTimeoutMs)) {
+ return function.apply();
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to get version bucket lock in " + lockTimeoutMs + " ms");
}
}
@@ -73,4 +64,13 @@ public class VersionBucket {
throw new RuntimeException(e);
}
}
+
+ protected boolean tryLock(int lockTimeoutMs) {
+ try {
+ return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index 066936a..96ea1ca 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -16,26 +16,19 @@
*/
package org.apache.solr.update;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
// TODO: make inner?
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
// TODO: could also store highest possible in the transaction log after a commit.
// Or on a new index, just scan "version" for the max?
/** @lucene.internal */
+/**
+ * The default implementation which uses the intrinsic object monitor.
+ * It uses less memory but ignores the <code>lockTimeoutMs</code>.
+ */
public class VersionBucket {
- private int lockTimeoutMs;
-
- public VersionBucket(int lockTimeoutMs) {
- this.lockTimeoutMs = lockTimeoutMs;
- }
-
- private final Lock lock = new ReentrantLock(true);
- private final Condition condition = lock.newCondition();
-
public long highest;
public void updateHighest(long val) {
@@ -44,32 +37,34 @@ public class VersionBucket {
}
}
- public int getLockTimeoutMs() {
- return lockTimeoutMs;
+ @FunctionalInterface
+ public interface CheckedFunction<T, R> {
+ R apply() throws IOException;
}
- public boolean tryLock() {
- try {
- return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ /**
+ * This will run the function with the intrinsic object monitor.
+ */
+ public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T, R> function) throws IOException {
+ synchronized (this) {
+ return function.apply();
}
}
+ /**
+ * Nothing to do for the intrinsic object monitor.
+ */
public void unlock() {
- lock.unlock();
}
public void signalAll() {
- condition.signalAll();
+ notifyAll();
}
public void awaitNanos(long nanosTimeout) {
try {
- condition.awaitNanos(nanosTimeout);
+ wait(TimeUnit.NANOSECONDS.toMillis(nanosTimeout));
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index dd39043..a0cdcda 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -48,15 +48,9 @@ public class VersionInfo {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
- /**
- * same as default client read timeout: 10 mins
- */
- private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
-
private final UpdateLog ulog;
private final VersionBucket[] buckets;
private SchemaField versionField;
- private SchemaField idField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private int versionBucketLockTimeoutMs;
@@ -100,14 +94,21 @@ public class VersionInfo {
this.ulog = ulog;
IndexSchema schema = ulog.uhandler.core.getLatestSchema();
versionField = getAndCheckVersionField(schema);
- idField = schema.getUniqueKeyField();
versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
- Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
+ Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "0")));
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
for (int i=0; i<buckets.length; i++) {
- buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
+ if (versionBucketLockTimeoutMs > 0) {
+ buckets[i] = new TimedVersionBucket();
+ } else {
+ buckets[i] = new VersionBucket();
+ }
}
}
+
+ public int getVersionBucketLockTimeoutMs() {
+ return versionBucketLockTimeoutMs;
+ }
public void reload() {
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index bf1255a..ed48c75 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -124,7 +124,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final AtomicUpdateDocumentMerger docMerger;
private final UpdateLog ulog;
- private final VersionInfo vinfo;
+ @VisibleForTesting
+ VersionInfo vinfo;
private final boolean versionsStored;
private boolean returnVersions;
@@ -331,166 +332,170 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
vinfo.lockForUpdate();
- if (bucket.tryLock()) {
- try {
- bucket.signalAll();
- // just in case anyone is waiting let them know that we have a new update
- // we obtain the version when synchronized and then do the add so we can ensure that
- // if version1 < version2 then version1 is actually added before version2.
-
- // even if we don't store the version field, synchronizing on the bucket
- // will enable us to know what version happened first, and thus enable
- // realtime-get to work reliably.
- // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
- // there may be other reasons in the future for a version on the commands
-
- if (versionsStored) {
-
- long bucketVersion = bucket.highest;
-
- if (leaderLogic) {
+ try {
+ long finalVersionOnUpdate = versionOnUpdate;
+ return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, forwardedFromCollection, bucket));
+ } finally {
+ vinfo.unlockForUpdate();
+ }
+ }
- if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
- // forwarded from a collection but we are not buffering so strip original version and apply our own
- // see SOLR-5308
- log.info("Removing version field from doc: " + cmd.getPrintableId());
- cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
- versionOnUpdate = 0;
- }
+ private boolean doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
+ boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket) throws IOException {
+ try {
+ BytesRef idBytes = cmd.getIndexedId();
+ bucket.signalAll();
+ // just in case anyone is waiting let them know that we have a new update
+ // we obtain the version when synchronized and then do the add so we can ensure that
+ // if version1 < version2 then version1 is actually added before version2.
+
+ // even if we don't store the version field, synchronizing on the bucket
+ // will enable us to know what version happened first, and thus enable
+ // realtime-get to work reliably.
+ // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
+ // there may be other reasons in the future for a version on the commands
+
+ if (versionsStored) {
+
+ long bucketVersion = bucket.highest;
+
+ if (leaderLogic) {
+
+ if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
+ // forwarded from a collection but we are not buffering so strip original version and apply our own
+ // see SOLR-5308
+ log.info("Removing version field from doc: " + cmd.getPrintableId());
+ cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
+ versionOnUpdate = 0;
+ }
- getUpdatedDocument(cmd, versionOnUpdate);
+ getUpdatedDocument(cmd, versionOnUpdate);
- // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
- if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
- && isReplayOrPeersync == false) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.add(cmd);
- return true;
- }
+ // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
+ if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
+ && isReplayOrPeersync == false) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.add(cmd);
+ return true;
+ }
- if (versionOnUpdate != 0) {
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- long foundVersion = lastVersion == null ? -1 : lastVersion;
- if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
- || (versionOnUpdate == 1 && foundVersion > 0)) {
- // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
- // specified it must exist (versionOnUpdate==1) and it does.
- } else {
- throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
- + " expected=" + versionOnUpdate + " actual=" + foundVersion);
- }
+ if (versionOnUpdate != 0) {
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ long foundVersion = lastVersion == null ? -1 : lastVersion;
+ if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
+ || (versionOnUpdate == 1 && foundVersion > 0)) {
+ // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
+ // specified it must exist (versionOnUpdate==1) and it does.
+ } else {
+ throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+ + " expected=" + versionOnUpdate + " actual=" + foundVersion);
}
+ }
- long version = vinfo.getNewClock();
- cmd.setVersion(version);
- cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
- bucket.updateHighest(version);
- } else {
- // The leader forwarded us this update.
- cmd.setVersion(versionOnUpdate);
-
- if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.add(cmd);
- return true;
- }
+ long version = vinfo.getNewClock();
+ cmd.setVersion(version);
+ cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
+ bucket.updateHighest(version);
+ } else {
+ // The leader forwarded us this update.
+ cmd.setVersion(versionOnUpdate);
+
+ if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.add(cmd);
+ return true;
+ }
- if (cmd.isInPlaceUpdate()) {
- long prev = cmd.prevVersion;
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion == null || Math.abs(lastVersion) < prev) {
- // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
- // So we shouldn't be here, unless what must've happened is:
- // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
- // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
- // from the deleted list (which might be older than the prev update!)
- UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
-
- if (fetchedFromLeader instanceof DeleteUpdateCommand) {
- log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
- + " was deleted at the leader subsequently.", idBytes.utf8ToString());
- versionDelete((DeleteUpdateCommand) fetchedFromLeader);
- return true;
- } else {
- assert fetchedFromLeader instanceof AddUpdateCommand;
- // Newer document was fetched from the leader. Apply that document instead of this current in-place
- // update.
- log.info(
- "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
- idBytes.utf8ToString(), fetchedFromLeader);
-
- // Make this update to become a non-inplace update containing the full document obtained from the
- // leader
- cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
- cmd.prevVersion = -1;
- cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
- assert cmd.isInPlaceUpdate() == false;
- }
+ if (cmd.isInPlaceUpdate()) {
+ long prev = cmd.prevVersion;
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion == null || Math.abs(lastVersion) < prev) {
+ // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
+ // So we shouldn't be here, unless what must've happened is:
+ // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
+ // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+ // from the deleted list (which might be older than the prev update!)
+ UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
+
+ if (fetchedFromLeader instanceof DeleteUpdateCommand) {
+ log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+ + " was deleted at the leader subsequently.", idBytes.utf8ToString());
+ versionDelete((DeleteUpdateCommand) fetchedFromLeader);
+ return true;
} else {
- if (lastVersion != null && Math.abs(lastVersion) > prev) {
- // this means we got a newer full doc update and in that case it makes no sense to apply the older
- // inplace update. Drop this update
- log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
- + ". Dropping current update.");
- return true;
- } else {
- // We're good, we should apply this update. First, update the bucket's highest.
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- bucket.updateHighest(versionOnUpdate);
- }
- }
+ assert fetchedFromLeader instanceof AddUpdateCommand;
+ // Newer document was fetched from the leader. Apply that document instead of this current in-place
+ // update.
+ log.info(
+ "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
+ idBytes.utf8ToString(), fetchedFromLeader);
+
+ // Make this update to become a non-inplace update containing the full document obtained from the
+ // leader
+ cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
+ cmd.prevVersion = -1;
+ cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
+ assert cmd.isInPlaceUpdate() == false;
}
} else {
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occurred.
- bucket.updateHighest(versionOnUpdate);
+ if (lastVersion != null && Math.abs(lastVersion) > prev) {
+ // this means we got a newer full doc update and in that case it makes no sense to apply the older
+ // inplace update. Drop this update
+ log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+ + ". Dropping current update.");
+ return true;
} else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
- log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
- return true;
+ // We're good, we should apply this update. First, update the bucket's highest.
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ bucket.updateHighest(versionOnUpdate);
}
}
}
- if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
- cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+ } else {
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occurred.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+ return true;
+ }
}
}
+ if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+ }
}
+ }
- SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
+ SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
- // TODO: possibly set checkDeleteByQueries as a flag on the command?
- doLocalAdd(cmd);
+ // TODO: possibly set checkDeleteByQueries as a flag on the command?
+ doLocalAdd(cmd);
- // if the update updates a doc that is part of a nested structure,
- // force open a realTimeSearcher to trigger a ulog cache refresh.
- // This refresh makes RTG handler aware of this update.q
- if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
- ulog.openRealtimeSearcher();
- }
-
- if (clonedDoc != null) {
- cmd.solrDoc = clonedDoc;
- }
- } finally {
- bucket.unlock();
- vinfo.unlockForUpdate();
+ // if the update updates a doc that is part of a nested structure,
+ // force open a realTimeSearcher to trigger a ulog cache refresh.
+ // This refresh makes RTG handler aware of this update.q
+ if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
+ ulog.openRealtimeSearcher();
}
- return false;
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+ if (clonedDoc != null) {
+ cmd.solrDoc = clonedDoc;
+ }
+ } finally {
+ bucket.unlock();
}
+ return false;
}
/**
@@ -527,31 +532,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
vinfo.lockForUpdate();
- if (bucket.tryLock()) {
- try {
- Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
- lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
-
- if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
- log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
- (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
- isReplayOrPeersync, cmd.getPrintableId());
- }
-
- while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
- bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
- lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
- lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
- }
- } finally {
-
- bucket.unlock();
-
- vinfo.unlockForUpdate();
- }
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+ try {
+ lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout));
+ } finally {
+ vinfo.unlockForUpdate();
}
if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
@@ -590,6 +574,33 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return missingUpdate.getVersion();
}
+ private long doWaitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, VersionBucket bucket,
+ TimeOut waitTimeout) {
+ long lastFoundVersion;
+ try {
+ Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
+
+ if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
+ log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+ (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
+ isReplayOrPeersync, cmd.getPrintableId());
+ }
+
+ while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+ long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS);
+ if(timeLeftInNanos > 0) { // 0 means: wait forever until notified, but we don't want that.
+ bucket.awaitNanos(timeLeftInNanos);
+ }
+ lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
+ }
+ } finally {
+ bucket.unlock();
+ }
+ return lastFoundVersion;
+ }
+
/**
* This method is used when an update on which a particular in-place update has been lost for some reason. This method
* sends a request to the shard leader to fetch the latest full document as seen on the leader.
@@ -938,87 +949,94 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
VersionBucket bucket = vinfo.bucket(bucketHash);
vinfo.lockForUpdate();
- if (bucket.tryLock()) {
- try {
- if (versionsStored) {
- long bucketVersion = bucket.highest;
+ try {
+ long finalVersionOnUpdate = versionOnUpdate;
+ return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionDelete(cmd, finalVersionOnUpdate, signedVersionOnUpdate, isReplayOrPeersync, leaderLogic,
+ forwardedFromCollection, bucket));
+ } finally {
+ vinfo.unlockForUpdate();
+ }
+ }
- if (leaderLogic) {
+ private boolean doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
+ boolean isReplayOrPeersync, boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket)
+ throws IOException {
+ try {
+ BytesRef idBytes = cmd.getIndexedId();
+ if (versionsStored) {
+ long bucketVersion = bucket.highest;
- if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
- // forwarded from a collection but we are not buffering so strip original version and apply our own
- // see SOLR-5308
- log.info("Removing version field from doc: " + cmd.getId());
- versionOnUpdate = signedVersionOnUpdate = 0;
- }
+ if (leaderLogic) {
- // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
- if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
- && !isReplayOrPeersync) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- log.info("Leader logic applied but update log is buffering: " + cmd.getId());
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.delete(cmd);
- return true;
- }
+ if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
+ // forwarded from a collection but we are not buffering so strip original version and apply our own
+ // see SOLR-5308
+ log.info("Removing version field from doc: " + cmd.getId());
+ versionOnUpdate = signedVersionOnUpdate = 0;
+ }
- if (signedVersionOnUpdate != 0) {
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- long foundVersion = lastVersion == null ? -1 : lastVersion;
- if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
- || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
- // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
- // specified it must exist (versionOnUpdate==1) and it does.
- } else {
- throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
- + signedVersionOnUpdate + " actual=" + foundVersion);
- }
+ // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
+ if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
+ && !isReplayOrPeersync) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ log.info("Leader logic applied but update log is buffering: " + cmd.getId());
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.delete(cmd);
+ return true;
+ }
+
+ if (signedVersionOnUpdate != 0) {
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ long foundVersion = lastVersion == null ? -1 : lastVersion;
+ if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
+ || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
+ // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
+ // specified it must exist (versionOnUpdate==1) and it does.
+ } else {
+ throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+ + signedVersionOnUpdate + " actual=" + foundVersion);
}
+ }
- long version = vinfo.getNewClock();
- cmd.setVersion(-version);
- bucket.updateHighest(version);
- } else {
- cmd.setVersion(-versionOnUpdate);
+ long version = vinfo.getNewClock();
+ cmd.setVersion(-version);
+ bucket.updateHighest(version);
+ } else {
+ cmd.setVersion(-versionOnUpdate);
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.delete(cmd);
+ return true;
+ }
- if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.delete(cmd);
+ // if we aren't the leader, then we need to check that updates were not re-ordered
+ if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+ // we're OK... this update has a version higher than anything we've seen
+ // in this bucket so far, so we know that no reordering has yet occurred.
+ bucket.updateHighest(versionOnUpdate);
+ } else {
+ // there have been updates higher than the current update. we need to check
+ // the specific version for this id.
+ Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+ if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+ // This update is a repeat, or was reordered. We need to drop this update.
+ log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
return true;
}
+ }
- // if we aren't the leader, then we need to check that updates were not re-ordered
- if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
- // we're OK... this update has a version higher than anything we've seen
- // in this bucket so far, so we know that no reordering has yet occurred.
- bucket.updateHighest(versionOnUpdate);
- } else {
- // there have been updates higher than the current update. we need to check
- // the specific version for this id.
- Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
- if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
- log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
- return true;
- }
- }
-
- if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
- cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
- }
+ if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
-
- doLocalDelete(cmd);
- return false;
- } finally {
- bucket.unlock();
- vinfo.unlockForUpdate();
}
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+
+ doLocalDelete(cmd);
+ return false;
+ } finally {
+ bucket.unlock();
}
}
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index a4c54d1..33820ac 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -17,36 +17,173 @@
package org.apache.solr.update.processor;
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.TimedVersionBucket;
import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
+ @Rule
+ public MockitoRule rule = MockitoJUnit.rule();
+ private static ExecutorService executor;
+
@BeforeClass
public static void beforeClass() throws Exception {
- initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal.xml");
+ executor = Executors.newCachedThreadPool();
+ initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal-with-another-uniqkey.xml");
+ }
+
+ @AfterClass
+ public static void AfterClass() {
+ executor.shutdown();
}
@Test
- public void testShouldBufferUpdateZk() {
+ public void testShouldBufferUpdateZk() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
- DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
- req, null, null, null);
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- // applying buffer updates, isReplayOrPeerSync flag doesn't matter
- assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
- assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
-
- assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
- // this is not an buffer updates and it depend on other updates
- cmd.prevVersion = 10;
- assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+ try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
+ req, null, null, null)) {
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ // applying buffer updates, isReplayOrPeerSync flag doesn't matter
+ assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+ assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
+
+ assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
+ // this is not an buffer updates and it depend on other updates
+ cmd.prevVersion = 10;
+ assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+ }
+ }
+
+ @Test
+ public void testVersionAdd() throws IOException {
+ SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
+ int threads = 5;
+ Function<DistributedUpdateProcessor,Boolean> versionAddFunc = (DistributedUpdateProcessor process) -> {
+ try {
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = new SolrInputDocument();
+ cmd.solrDoc.setField("notid", "10");
+ return process.versionAdd(cmd);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ int succeeded = runCommands(threads, 1000, req, versionAddFunc);
+ // only one should succeed
+ assertThat(succeeded, is(1));
+
+ succeeded = runCommands(threads, -1, req, versionAddFunc);
+ // all should succeed
+ assertThat(succeeded, is(threads));
}
+ @Test
+ public void testVersionDelete() throws IOException {
+ SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
+
+ int threads = 5;
+ Function<DistributedUpdateProcessor,Boolean> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
+ try {
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.id = "1";
+ return process.versionDelete(cmd);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ int succeeded = runCommands(threads, 1000, req, versionDeleteFunc);
+ // only one should succeed
+ assertThat(succeeded, is(1));
+
+ succeeded = runCommands(threads, -1, req, versionDeleteFunc);
+ // all should succeed
+ assertThat(succeeded, is(threads));
+ }
+
+ /**
+ * @return how many requests succeeded
+ */
+ private int runCommands(int threads, int versionBucketLockTimeoutMs, SolrQueryRequest req,
+ Function<DistributedUpdateProcessor,Boolean> function)
+ throws IOException {
+ try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
+ req, null, null, null)) {
+ if (versionBucketLockTimeoutMs > 0) {
+ // use TimedVersionBucket with versionBucketLockTimeoutMs
+ VersionInfo vinfo = Mockito.spy(processor.vinfo);
+ processor.vinfo = vinfo;
+
+ doReturn(new TimedVersionBucket() {
+ /**
+ * simulate the case: it takes 5 seconds to add the doc
+ *
+ */
+ @Override
+ protected boolean tryLock(int lockTimeoutMs) {
+ boolean locked = super.tryLock(versionBucketLockTimeoutMs);
+ if (locked) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return locked;
+ }
+ }).when(vinfo).bucket(anyInt());
+ }
+ CountDownLatch latch = new CountDownLatch(1);
+ Collection<Future<Boolean>> futures = new ArrayList<>();
+ for (int t = 0; t < threads; ++t) {
+ futures.add(executor.submit(() -> {
+ latch.await();
+ return function.apply(processor);
+ }));
+ }
+ latch.countDown();
+
+ int succeeded = 0;
+ for (Future<Boolean> f : futures) {
+ try {
+ f.get();
+ succeeded++;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ // do nothing
+ }
+ }
+ return succeeded;
+ }
+ }
}
diff --git a/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc b/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
index b0f3e3a..7d7132d 100644
--- a/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
+++ b/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
@@ -131,3 +131,27 @@ An example, to be included under `<config><updateHandler>` in `solrconfig.xml`,
<int name="numVersionBuckets">65536</int>
</updateLog>
----
+
+== Other options
+In some cases complex updates (such as spatial/shape) may take very long time to complete. In the default
+configuration other updates that fall into the same internal version bucket will wait indefinitely and
+eventually these outstanding requests may pile up and lead to thread exhaustion and eventually to
+OutOfMemory errors.
+
+The option `versionBucketLockTimeoutMs` in the `updateHandler` section helps to prevent that by
+specifying a limited timeout for such extremely long running update requests. If this limit
+is reached this update will fail but it won't block forever all other updates. See SOLR-12833 for more details.
+
+There's a memory cost associated with this setting. Values greater than the default 0 (meaning unlimited timeout)
+cause Solr to use a different internal implementation of the version bucket, which increases memory consumption
+from ~1.5MB to ~6.8MB per Solr core.
+
+An example of specifying this option under `<config>` section of `solrconfig.xml`:
+
+[source,xml]
+----
+<updateHandler class="solr.DirectUpdateHandler2">
+ ...
+ <int name="versionBucketLockTimeoutMs">10000</int>
+</updateHandler>
+----