You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/04/30 20:31:54 UTC

[lucene-solr] branch master updated: SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 70e0907  SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
70e0907 is described below

commit 70e090717d21a9713c45b8a52e35df940a4277df
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Apr 30 22:31:06 2019 +0200

    SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
---
 solr/CHANGES.txt                                   |   3 +
 ...{VersionBucket.java => TimedVersionBucket.java} |  56 +--
 .../java/org/apache/solr/update/VersionBucket.java |  43 +-
 .../java/org/apache/solr/update/VersionInfo.java   |  19 +-
 .../processor/DistributedUpdateProcessor.java      | 480 +++++++++++----------
 .../processor/DistributedUpdateProcessorTest.java  | 163 ++++++-
 .../src/updatehandlers-in-solrconfig.adoc          |  24 ++
 7 files changed, 483 insertions(+), 305 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ab5e588..e8d455b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -266,6 +266,9 @@ Improvements
 * SOLR-13432: Add .toString methods to BitDocSet and SortedIntDocSet so that enabling "showItems" on the filter caches
   shows some useful information about the values in the cache. (shalin)
 
+* SOLR-12833: Avoid unnecessary memory cost when DistributedUpdateProcessor timed-out lock is not used.
+  (jefferyyuan, ab)
+
 
 Other Changes
 ----------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
similarity index 67%
copy from solr/core/src/java/org/apache/solr/update/VersionBucket.java
copy to solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index 066936a..b8af912 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -16,44 +16,35 @@
  */
 package org.apache.solr.update;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-// TODO: make inner?
-// TODO: store the highest possible in the index on a commit (but how to not block adds?)
-// TODO: could also store highest possible in the transaction log after a commit.
-// Or on a new index, just scan "version" for the max?
-/** @lucene.internal */
-public class VersionBucket {
-  private int lockTimeoutMs;
+import org.apache.solr.common.SolrException;
 
-  public VersionBucket(int lockTimeoutMs) {
-    this.lockTimeoutMs = lockTimeoutMs;
-  }
+/** @lucene.internal */
+/**
+ * This implementation uses lock and condition and will throw exception if it can't obtain the lock within
+ * <code>lockTimeoutMs</code>.
+ */
+public class TimedVersionBucket extends VersionBucket {
 
   private final Lock lock = new ReentrantLock(true);
   private final Condition condition = lock.newCondition();
 
-  public long highest;
-
-  public void updateHighest(long val) {
-    if (highest != 0) {
-      highest = Math.max(highest, Math.abs(val));
-    }
-  }
-  
-  public int getLockTimeoutMs() {
-    return lockTimeoutMs;
-  }
-  
-  public boolean tryLock() {
-    try {
-      return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
+  /**
+   * This will run the function with the lock. It will throw exception if it can't obtain the lock within
+   * <code>lockTimeoutMs</code>.
+   */
+  @Override
+  public <T,R> R runWithLock(int lockTimeoutMs, CheckedFunction<T,R> function) throws IOException {
+    if (tryLock(lockTimeoutMs)) {
+      return function.apply();
+    } else {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unable to get version bucket lock in " + lockTimeoutMs + " ms");
     }
   }
 
@@ -73,4 +64,13 @@ public class VersionBucket {
       throw new RuntimeException(e);
     }
   }
+
+  protected boolean tryLock(int lockTimeoutMs) {
+    try {
+      return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index 066936a..96ea1ca 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -16,26 +16,19 @@
  */
 package org.apache.solr.update;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 // TODO: make inner?
 // TODO: store the highest possible in the index on a commit (but how to not block adds?)
 // TODO: could also store highest possible in the transaction log after a commit.
 // Or on a new index, just scan "version" for the max?
 /** @lucene.internal */
+/**
+ * The default implementation which uses the intrinsic object monitor.
+ * It uses less memory but ignores the <code>lockTimeoutMs</code>.
+ */
 public class VersionBucket {
-  private int lockTimeoutMs;
-
-  public VersionBucket(int lockTimeoutMs) {
-    this.lockTimeoutMs = lockTimeoutMs;
-  }
-
-  private final Lock lock = new ReentrantLock(true);
-  private final Condition condition = lock.newCondition();
-
   public long highest;
 
   public void updateHighest(long val) {
@@ -44,32 +37,34 @@ public class VersionBucket {
     }
   }
   
-  public int getLockTimeoutMs() {
-    return lockTimeoutMs;
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+     R apply() throws IOException;
   }
   
-  public boolean tryLock() {
-    try {
-      return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
+  /**
+   * This will run the function with the intrinsic object monitor.
+   */
+  public <T, R> R runWithLock(int lockTimeoutMs, CheckedFunction<T, R> function) throws IOException {
+    synchronized (this) {
+      return function.apply();
     }
   }
 
+  /**
+   * Nothing to do for the intrinsic object monitor.
+   */
   public void unlock() {
-    lock.unlock();
   }
 
   public void signalAll() {
-    condition.signalAll();
+    notifyAll();
   }
 
   public void awaitNanos(long nanosTimeout) {
     try {
-      condition.awaitNanos(nanosTimeout);
+      wait(TimeUnit.NANOSECONDS.toMillis(nanosTimeout));
     } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
       throw new RuntimeException(e);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index dd39043..a0cdcda 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -48,15 +48,9 @@ public class VersionInfo {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
 
-  /**
-   * same as default client read timeout: 10 mins
-   */
-  private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
-
   private final UpdateLog ulog;
   private final VersionBucket[] buckets;
   private SchemaField versionField;
-  private SchemaField idField;
   final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
   private int versionBucketLockTimeoutMs;
@@ -100,14 +94,21 @@ public class VersionInfo {
     this.ulog = ulog;
     IndexSchema schema = ulog.uhandler.core.getLatestSchema(); 
     versionField = getAndCheckVersionField(schema);
-    idField = schema.getUniqueKeyField();
     versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
-        Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
+        Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "0")));
     buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
     for (int i=0; i<buckets.length; i++) {
-      buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
+      if (versionBucketLockTimeoutMs > 0) {
+        buckets[i] = new TimedVersionBucket();
+      } else {
+        buckets[i] = new VersionBucket();
+      }
     }
   }
+  
+  public int getVersionBucketLockTimeoutMs() {
+    return versionBucketLockTimeoutMs;
+  }
 
   public void reload() {
   }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index bf1255a..ed48c75 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -124,7 +124,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private final AtomicUpdateDocumentMerger docMerger;
 
   private final UpdateLog ulog;
-  private final VersionInfo vinfo;
+  @VisibleForTesting
+  VersionInfo vinfo;
   private final boolean versionsStored;
   private boolean returnVersions;
 
@@ -331,166 +332,170 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     vinfo.lockForUpdate();
-    if (bucket.tryLock()) {
-      try {
-        bucket.signalAll();
-        // just in case anyone is waiting let them know that we have a new update
-        // we obtain the version when synchronized and then do the add so we can ensure that
-        // if version1 < version2 then version1 is actually added before version2.
-
-        // even if we don't store the version field, synchronizing on the bucket
-        // will enable us to know what version happened first, and thus enable
-        // realtime-get to work reliably.
-        // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
-        // there may be other reasons in the future for a version on the commands
-
-        if (versionsStored) {
-
-          long bucketVersion = bucket.highest;
-
-          if (leaderLogic) {
+    try {
+      long finalVersionOnUpdate = versionOnUpdate;
+      return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionAdd(cmd, finalVersionOnUpdate, isReplayOrPeersync, leaderLogic, forwardedFromCollection, bucket));
+    } finally {
+      vinfo.unlockForUpdate();
+    }
+  }
 
-            if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
-              // forwarded from a collection but we are not buffering so strip original version and apply our own
-              // see SOLR-5308
-              log.info("Removing version field from doc: " + cmd.getPrintableId());
-              cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
-              versionOnUpdate = 0;
-            }
+  private boolean doVersionAdd(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync,
+      boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket) throws IOException {
+    try {
+      BytesRef idBytes = cmd.getIndexedId();
+      bucket.signalAll();
+      // just in case anyone is waiting let them know that we have a new update
+      // we obtain the version when synchronized and then do the add so we can ensure that
+      // if version1 < version2 then version1 is actually added before version2.
+
+      // even if we don't store the version field, synchronizing on the bucket
+      // will enable us to know what version happened first, and thus enable
+      // realtime-get to work reliably.
+      // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
+      // there may be other reasons in the future for a version on the commands
+
+      if (versionsStored) {
+
+        long bucketVersion = bucket.highest;
+
+        if (leaderLogic) {
+
+          if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
+            // forwarded from a collection but we are not buffering so strip original version and apply our own
+            // see SOLR-5308
+            log.info("Removing version field from doc: " + cmd.getPrintableId());
+            cmd.solrDoc.remove(CommonParams.VERSION_FIELD);
+            versionOnUpdate = 0;
+          }
 
-            getUpdatedDocument(cmd, versionOnUpdate);
+          getUpdatedDocument(cmd, versionOnUpdate);
 
-            // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
-            if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && isReplayOrPeersync == false) {
-              // we're not in an active state, and this update isn't from a replay, so buffer it.
-              log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-              ulog.add(cmd);
-              return true;
-            }
+          // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
+          if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
+              && isReplayOrPeersync == false) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            log.info("Leader logic applied but update log is buffering: " + cmd.getPrintableId());
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.add(cmd);
+            return true;
+          }
 
-            if (versionOnUpdate != 0) {
-              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-              long foundVersion = lastVersion == null ? -1 : lastVersion;
-              if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
-                  || (versionOnUpdate == 1 && foundVersion > 0)) {
-                // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
-                // specified it must exist (versionOnUpdate==1) and it does.
-              } else {
-                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
-                    + " expected=" + versionOnUpdate + " actual=" + foundVersion);
-              }
+          if (versionOnUpdate != 0) {
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            long foundVersion = lastVersion == null ? -1 : lastVersion;
+            if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
+                || (versionOnUpdate == 1 && foundVersion > 0)) {
+              // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
+              // specified it must exist (versionOnUpdate==1) and it does.
+            } else {
+              throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+                  + " expected=" + versionOnUpdate + " actual=" + foundVersion);
             }
+          }
 
-            long version = vinfo.getNewClock();
-            cmd.setVersion(version);
-            cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
-            bucket.updateHighest(version);
-          } else {
-            // The leader forwarded us this update.
-            cmd.setVersion(versionOnUpdate);
-
-            if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
-              // we're not in an active state, and this update isn't from a replay, so buffer it.
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-              ulog.add(cmd);
-              return true;
-            }
+          long version = vinfo.getNewClock();
+          cmd.setVersion(version);
+          cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
+          bucket.updateHighest(version);
+        } else {
+          // The leader forwarded us this update.
+          cmd.setVersion(versionOnUpdate);
+
+          if (shouldBufferUpdate(cmd, isReplayOrPeersync, ulog.getState())) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.add(cmd);
+            return true;
+          }
 
-            if (cmd.isInPlaceUpdate()) {
-              long prev = cmd.prevVersion;
-              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-              if (lastVersion == null || Math.abs(lastVersion) < prev) {
-                // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
-                // So we shouldn't be here, unless what must've happened is:
-                // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
-                // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
-                // from the deleted list (which might be older than the prev update!)
-                UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
-
-                if (fetchedFromLeader instanceof DeleteUpdateCommand) {
-                  log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
-                      + " was deleted at the leader subsequently.", idBytes.utf8ToString());
-                  versionDelete((DeleteUpdateCommand) fetchedFromLeader);
-                  return true;
-                } else {
-                  assert fetchedFromLeader instanceof AddUpdateCommand;
-                  // Newer document was fetched from the leader. Apply that document instead of this current in-place
-                  // update.
-                  log.info(
-                      "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
-                      idBytes.utf8ToString(), fetchedFromLeader);
-
-                  // Make this update to become a non-inplace update containing the full document obtained from the
-                  // leader
-                  cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
-                  cmd.prevVersion = -1;
-                  cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
-                  assert cmd.isInPlaceUpdate() == false;
-                }
+          if (cmd.isInPlaceUpdate()) {
+            long prev = cmd.prevVersion;
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            if (lastVersion == null || Math.abs(lastVersion) < prev) {
+              // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
+              // So we shouldn't be here, unless what must've happened is:
+              // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
+              // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+              // from the deleted list (which might be older than the prev update!)
+              UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
+
+              if (fetchedFromLeader instanceof DeleteUpdateCommand) {
+                log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+                    + " was deleted at the leader subsequently.", idBytes.utf8ToString());
+                versionDelete((DeleteUpdateCommand) fetchedFromLeader);
+                return true;
               } else {
-                if (lastVersion != null && Math.abs(lastVersion) > prev) {
-                  // this means we got a newer full doc update and in that case it makes no sense to apply the older
-                  // inplace update. Drop this update
-                  log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
-                      + ". Dropping current update.");
-                  return true;
-                } else {
-                  // We're good, we should apply this update. First, update the bucket's highest.
-                  if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
-                    bucket.updateHighest(versionOnUpdate);
-                  }
-                }
+                assert fetchedFromLeader instanceof AddUpdateCommand;
+                // Newer document was fetched from the leader. Apply that document instead of this current in-place
+                // update.
+                log.info(
+                    "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
+                    idBytes.utf8ToString(), fetchedFromLeader);
+
+                // Make this update to become a non-inplace update containing the full document obtained from the
+                // leader
+                cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
+                cmd.prevVersion = -1;
+                cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
+                assert cmd.isInPlaceUpdate() == false;
               }
             } else {
-              // if we aren't the leader, then we need to check that updates were not re-ordered
-              if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
-                // we're OK... this update has a version higher than anything we've seen
-                // in this bucket so far, so we know that no reordering has yet occurred.
-                bucket.updateHighest(versionOnUpdate);
+              if (lastVersion != null && Math.abs(lastVersion) > prev) {
+                // this means we got a newer full doc update and in that case it makes no sense to apply the older
+                // inplace update. Drop this update
+                log.info("Update was applied on version: " + prev + ", but last version I have is: " + lastVersion
+                    + ". Dropping current update.");
+                return true;
               } else {
-                // there have been updates higher than the current update. we need to check
-                // the specific version for this id.
-                Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-                if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-                  // This update is a repeat, or was reordered. We need to drop this update.
-                  log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
-                  return true;
+                // We're good, we should apply this update. First, update the bucket's highest.
+                if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+                  bucket.updateHighest(versionOnUpdate);
                 }
               }
             }
-            if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+          } else {
+            // if we aren't the leader, then we need to check that updates were not re-ordered
+            if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+              // we're OK... this update has a version higher than anything we've seen
+              // in this bucket so far, so we know that no reordering has yet occurred.
+              bucket.updateHighest(versionOnUpdate);
+            } else {
+              // there have been updates higher than the current update. we need to check
+              // the specific version for this id.
+              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+              if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+                // This update is a repeat, or was reordered. We need to drop this update.
+                log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
+                return true;
+              }
             }
           }
+          if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+          }
         }
+      }
 
-        SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
+      SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
 
-        // TODO: possibly set checkDeleteByQueries as a flag on the command?
-        doLocalAdd(cmd);
+      // TODO: possibly set checkDeleteByQueries as a flag on the command?
+      doLocalAdd(cmd);
 
-        // if the update updates a doc that is part of a nested structure,
-        // force open a realTimeSearcher to trigger a ulog cache refresh.
-        // This refresh makes RTG handler aware of this update.q
-        if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
-          ulog.openRealtimeSearcher();
-        }
-
-        if (clonedDoc != null) {
-          cmd.solrDoc = clonedDoc;
-        }
-      } finally {
-        bucket.unlock();
-        vinfo.unlockForUpdate();
+      // if the update updates a doc that is part of a nested structure,
+      // force open a realTimeSearcher to trigger a ulog cache refresh.
+      // This refresh makes RTG handler aware of this update.q
+      if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
+        ulog.openRealtimeSearcher();
       }
-      return false;
 
-    } else {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+      if (clonedDoc != null) {
+        cmd.solrDoc = clonedDoc;
+      }
+    } finally {
+      bucket.unlock();
     }
+    return false;
   }
 
   /**
@@ -527,31 +532,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
 
     vinfo.lockForUpdate();
-    if (bucket.tryLock()) {
-      try {
-        Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
-        lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
-
-        if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
-          log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
-              (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
-              isReplayOrPeersync, cmd.getPrintableId());
-        }
-
-        while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
-          bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
-          lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
-          lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
-        }
-      } finally {
-
-        bucket.unlock();
-
-        vinfo.unlockForUpdate();
-      }
-    } else {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+    try {
+      lastFoundVersion = bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doWaitForDependentUpdates(cmd, versionOnUpdate, isReplayOrPeersync, bucket, waitTimeout));
+    } finally {
+      vinfo.unlockForUpdate();
     }
 
     if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
@@ -590,6 +574,33 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     return missingUpdate.getVersion();
   }
 
+  private long doWaitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync, VersionBucket bucket,
+      TimeOut waitTimeout) {
+    long lastFoundVersion;
+    try {
+      Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+      lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
+
+      if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
+        log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+            (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
+            isReplayOrPeersync, cmd.getPrintableId());
+      }
+
+      while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+        long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS);
+        if(timeLeftInNanos > 0) { // 0 means: wait forever until notified, but we don't want that.
+          bucket.awaitNanos(timeLeftInNanos);
+        }
+        lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+        lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
+      }
+    } finally {
+      bucket.unlock();
+    }
+    return lastFoundVersion;
+  }
+
   /**
    * This method is used when an update on which a particular in-place update has been lost for some reason. This method
    * sends a request to the shard leader to fetch the latest full document as seen on the leader.
@@ -938,87 +949,94 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     VersionBucket bucket = vinfo.bucket(bucketHash);
 
     vinfo.lockForUpdate();
-    if (bucket.tryLock()) {
-      try {
-        if (versionsStored) {
-          long bucketVersion = bucket.highest;
+    try {
+      long finalVersionOnUpdate = versionOnUpdate;
+      return bucket.runWithLock(vinfo.getVersionBucketLockTimeoutMs(), () -> doVersionDelete(cmd, finalVersionOnUpdate, signedVersionOnUpdate, isReplayOrPeersync, leaderLogic,
+          forwardedFromCollection, bucket));
+    } finally {
+      vinfo.unlockForUpdate();
+    }
+  }
 
-          if (leaderLogic) {
+  private boolean doVersionDelete(DeleteUpdateCommand cmd, long versionOnUpdate, long signedVersionOnUpdate,
+      boolean isReplayOrPeersync, boolean leaderLogic, boolean forwardedFromCollection, VersionBucket bucket)
+      throws IOException {
+    try {
+      BytesRef idBytes = cmd.getIndexedId();
+      if (versionsStored) {
+        long bucketVersion = bucket.highest;
 
-            if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
-              // forwarded from a collection but we are not buffering so strip original version and apply our own
-              // see SOLR-5308
-              log.info("Removing version field from doc: " + cmd.getId());
-              versionOnUpdate = signedVersionOnUpdate = 0;
-            }
+        if (leaderLogic) {
 
-            // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
-            if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
-                && !isReplayOrPeersync) {
-              // we're not in an active state, and this update isn't from a replay, so buffer it.
-              log.info("Leader logic applied but update log is buffering: " + cmd.getId());
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-              ulog.delete(cmd);
-              return true;
-            }
+          if (forwardedFromCollection && ulog.getState() == UpdateLog.State.ACTIVE) {
+            // forwarded from a collection but we are not buffering so strip original version and apply our own
+            // see SOLR-5308
+            log.info("Removing version field from doc: " + cmd.getId());
+            versionOnUpdate = signedVersionOnUpdate = 0;
+          }
 
-            if (signedVersionOnUpdate != 0) {
-              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-              long foundVersion = lastVersion == null ? -1 : lastVersion;
-              if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
-                  || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
-                // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
-                // specified it must exist (versionOnUpdate==1) and it does.
-              } else {
-                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
-                    + signedVersionOnUpdate + " actual=" + foundVersion);
-              }
+          // leaders can also be in buffering state during "migrate" API call, see SOLR-5308
+          if (forwardedFromCollection && ulog.getState() != UpdateLog.State.ACTIVE
+              && !isReplayOrPeersync) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            log.info("Leader logic applied but update log is buffering: " + cmd.getId());
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.delete(cmd);
+            return true;
+          }
+
+          if (signedVersionOnUpdate != 0) {
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            long foundVersion = lastVersion == null ? -1 : lastVersion;
+            if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
+                || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
+              // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
+              // specified it must exist (versionOnUpdate==1) and it does.
+            } else {
+              throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+                  + signedVersionOnUpdate + " actual=" + foundVersion);
             }
+          }
 
-            long version = vinfo.getNewClock();
-            cmd.setVersion(-version);
-            bucket.updateHighest(version);
-          } else {
-            cmd.setVersion(-versionOnUpdate);
+          long version = vinfo.getNewClock();
+          cmd.setVersion(-version);
+          bucket.updateHighest(version);
+        } else {
+          cmd.setVersion(-versionOnUpdate);
+
+          if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.delete(cmd);
+            return true;
+          }
 
-            if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
-              // we're not in an active state, and this update isn't from a replay, so buffer it.
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-              ulog.delete(cmd);
+          // if we aren't the leader, then we need to check that updates were not re-ordered
+          if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
+            // we're OK... this update has a version higher than anything we've seen
+            // in this bucket so far, so we know that no reordering has yet occurred.
+            bucket.updateHighest(versionOnUpdate);
+          } else {
+            // there have been updates higher than the current update. we need to check
+            // the specific version for this id.
+            Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
+            if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
+              // This update is a repeat, or was reordered. We need to drop this update.
+              log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
               return true;
             }
+          }
 
-            // if we aren't the leader, then we need to check that updates were not re-ordered
-            if (bucketVersion != 0 && bucketVersion < versionOnUpdate) {
-              // we're OK... this update has a version higher than anything we've seen
-              // in this bucket so far, so we know that no reordering has yet occurred.
-              bucket.updateHighest(versionOnUpdate);
-            } else {
-              // there have been updates higher than the current update. we need to check
-              // the specific version for this id.
-              Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
-              if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-                // This update is a repeat, or was reordered. We need to drop this update.
-                log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
-                return true;
-              }
-            }
-
-            if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
-              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
-            }
+          if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
           }
         }
-
-        doLocalDelete(cmd);
-        return false;
-      } finally {
-        bucket.unlock();
-        vinfo.unlockForUpdate();
       }
-    } else {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
+
+      doLocalDelete(cmd);
+      return false;
+    } finally {
+      bucket.unlock();
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index a4c54d1..33820ac 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -17,36 +17,173 @@
 
 package org.apache.solr.update.processor;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.TimedVersionBucket;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.VersionInfo;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
 
 public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
 
+  @Rule 
+  public MockitoRule rule = MockitoJUnit.rule();
+  private static ExecutorService executor;
+
   @BeforeClass
   public static void beforeClass() throws Exception {
-    initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal.xml");
+    executor = Executors.newCachedThreadPool();
+    initCore("solr/collection1/conf/solrconfig.xml","solr/collection1/conf/schema-minimal-with-another-uniqkey.xml");
+  }
+  
+  @AfterClass
+  public static void AfterClass() {
+    executor.shutdown();
   }
 
   @Test
-  public void testShouldBufferUpdateZk() {
+  public void testShouldBufferUpdateZk() throws IOException {
     SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
-    DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
-        req, null, null, null);
-    AddUpdateCommand cmd = new AddUpdateCommand(req);
-    // applying buffer updates, isReplayOrPeerSync flag doesn't matter
-    assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
-    assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
-
-    assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
-    // this is not an buffer updates and it depend on other updates
-    cmd.prevVersion = 10;
-    assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+    try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
+        req, null, null, null)) {
+      AddUpdateCommand cmd = new AddUpdateCommand(req);
+      // applying buffer updates, isReplayOrPeerSync flag doesn't matter
+      assertFalse(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+      assertFalse(processor.shouldBufferUpdate(cmd, true, UpdateLog.State.APPLYING_BUFFERED));
+  
+      assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.BUFFERING));
+      // this is not an buffer updates and it depend on other updates
+      cmd.prevVersion = 10;
+      assertTrue(processor.shouldBufferUpdate(cmd, false, UpdateLog.State.APPLYING_BUFFERED));
+    }
+  }
+  
+  @Test
+  public void testVersionAdd() throws IOException {
+    SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
+    int threads = 5;
+    Function<DistributedUpdateProcessor,Boolean> versionAddFunc = (DistributedUpdateProcessor process) -> {
+      try {
+        AddUpdateCommand cmd = new AddUpdateCommand(req);
+        cmd.solrDoc = new SolrInputDocument();
+        cmd.solrDoc.setField("notid", "10");
+        return process.versionAdd(cmd);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+    int succeeded = runCommands(threads, 1000, req, versionAddFunc);
+    // only one should succeed
+    assertThat(succeeded, is(1));
+
+    succeeded = runCommands(threads, -1, req, versionAddFunc);
+    // all should succeed
+    assertThat(succeeded, is(threads));
   }
 
+  @Test
+  public void testVersionDelete() throws IOException {
+    SolrQueryRequest req = new LocalSolrQueryRequest(h.getCore(), new ModifiableSolrParams());
+
+    int threads = 5;
+    Function<DistributedUpdateProcessor,Boolean> versionDeleteFunc = (DistributedUpdateProcessor process) -> {
+      try {
+        DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+        cmd.id = "1";
+        return process.versionDelete(cmd);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+    int succeeded = runCommands(threads, 1000, req, versionDeleteFunc);
+    // only one should succeed
+    assertThat(succeeded, is(1));
+
+    succeeded = runCommands(threads, -1, req, versionDeleteFunc);
+    // all should succeed
+    assertThat(succeeded, is(threads));
+  }
+  
+  /**
+   * @return how many requests succeeded
+   */
+  private int runCommands(int threads, int versionBucketLockTimeoutMs, SolrQueryRequest req,
+      Function<DistributedUpdateProcessor,Boolean> function)
+      throws IOException {
+    try (DistributedUpdateProcessor processor = new DistributedUpdateProcessor(
+        req, null, null, null)) {
+      if (versionBucketLockTimeoutMs > 0) {
+        // use TimedVersionBucket with versionBucketLockTimeoutMs
+        VersionInfo vinfo = Mockito.spy(processor.vinfo);
+        processor.vinfo = vinfo;
+
+        doReturn(new TimedVersionBucket() {
+          /**
+           * simulate the case: it takes 5 seconds to add the doc
+           * 
+           */
+          @Override
+          protected boolean tryLock(int lockTimeoutMs) {
+            boolean locked = super.tryLock(versionBucketLockTimeoutMs);
+            if (locked) {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            }
+            return locked;
+          }
+        }).when(vinfo).bucket(anyInt());
+      }
+      CountDownLatch latch = new CountDownLatch(1);
+      Collection<Future<Boolean>> futures = new ArrayList<>();
+      for (int t = 0; t < threads; ++t) {
+        futures.add(executor.submit(() -> {
+          latch.await();
+          return function.apply(processor);
+        }));
+      }
+      latch.countDown();
+
+      int succeeded = 0;
+      for (Future<Boolean> f : futures) {
+        try {
+          f.get();
+          succeeded++;
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          // do nothing
+        }
+      }
+      return succeeded;
+    }
+  }
 }
diff --git a/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc b/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
index b0f3e3a..7d7132d 100644
--- a/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
+++ b/solr/solr-ref-guide/src/updatehandlers-in-solrconfig.adoc
@@ -131,3 +131,27 @@ An example, to be included under `<config><updateHandler>` in `solrconfig.xml`,
   <int name="numVersionBuckets">65536</int>
 </updateLog>
 ----
+
+== Other options
+In some cases complex updates (such as spatial/shape) may take very long time to complete. In the default
+configuration other updates that fall into the same internal version bucket will wait indefinitely and
+eventually these outstanding requests may pile up and lead to thread exhaustion and eventually to
+OutOfMemory errors.
+
+The option `versionBucketLockTimeoutMs` in the `updateHandler` section helps to prevent that by
+specifying a limited timeout for such extremely long running update requests. If this limit
+is reached this update will fail but it won't block forever all other updates. See SOLR-12833 for more details.
+
+There's a memory cost associated with this setting. Values greater than the default 0 (meaning unlimited timeout)
+cause Solr to use a different internal implementation of the version bucket, which increases memory consumption
+from ~1.5MB to ~6.8MB per Solr core.
+
+An example of specifying this option under `<config>` section of `solrconfig.xml`:
+
+[source,xml]
+----
+<updateHandler class="solr.DirectUpdateHandler2">
+  ...
+  <int name="versionBucketLockTimeoutMs">10000</int>
+</updateHandler>
+----