You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/12/06 15:28:48 UTC

[1/2] lucene-solr:branch_7x: SOLR-12833: Add configurable timeout to VersionBucket lock.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x ec1d05ba9 -> 73ffc03ad


SOLR-12833: Add configurable timeout to VersionBucket lock.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e5329f27
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e5329f27
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e5329f27

Branch: refs/heads/branch_7x
Commit: e5329f27c036b1017feb15651aca6dd374584308
Parents: ec1d05b
Author: markrmiller <ma...@apache.org>
Authored: Tue Dec 4 10:01:24 2018 -0600
Committer: markrmiller <ma...@apache.org>
Committed: Thu Dec 6 09:28:15 2018 -0600

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/update/VersionBucket.java   |  44 +++++++
 .../org/apache/solr/update/VersionInfo.java     |  13 +-
 .../processor/DistributedUpdateProcessor.java   | 132 +++++++++++--------
 4 files changed, 131 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 778991c..4de1651 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -65,6 +65,8 @@ Improvements
 
 * SOLR-12804: Remove static modifier from Overseer queue access. (Mark Miller)
 
+* SOLR-12833: Add configurable timeout to VersionBucket lock. (Jeffery Yuan, Mark Miller)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/VersionBucket.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index c0c2826..066936a 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -16,12 +16,26 @@
  */
 package org.apache.solr.update;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 // TODO: make inner?
 // TODO: store the highest possible in the index on a commit (but how to not block adds?)
 // TODO: could also store highest possible in the transaction log after a commit.
 // Or on a new index, just scan "version" for the max?
 /** @lucene.internal */
 public class VersionBucket {
+  private int lockTimeoutMs;
+
+  public VersionBucket(int lockTimeoutMs) {
+    this.lockTimeoutMs = lockTimeoutMs;
+  }
+
+  private final Lock lock = new ReentrantLock(true);
+  private final Condition condition = lock.newCondition();
+
   public long highest;
 
   public void updateHighest(long val) {
@@ -29,4 +43,34 @@ public class VersionBucket {
       highest = Math.max(highest, Math.abs(val));
     }
   }
+  
+  public int getLockTimeoutMs() {
+    return lockTimeoutMs;
+  }
+  
+  public boolean tryLock() {
+    try {
+      return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public void signalAll() {
+    condition.signalAll();
+  }
+
+  public void awaitNanos(long nanosTimeout) {
+    try {
+      condition.awaitNanos(nanosTimeout);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/VersionInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 7697be4..dd39043 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -45,8 +45,13 @@ import org.slf4j.LoggerFactory;
 import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
 
 public class VersionInfo {
-
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
+
+  /**
+   * same as default client read timeout: 10 mins
+   */
+  private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
 
   private final UpdateLog ulog;
   private final VersionBucket[] buckets;
@@ -54,6 +59,8 @@ public class VersionInfo {
   private SchemaField idField;
   final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
+  private int versionBucketLockTimeoutMs;
+
   /**
    * Gets and returns the {@link org.apache.solr.common.params.CommonParams#VERSION_FIELD} from the specified
    * schema, after verifying that it is indexed, stored, and single-valued.  
@@ -94,9 +101,11 @@ public class VersionInfo {
     IndexSchema schema = ulog.uhandler.core.getLatestSchema(); 
     versionField = getAndCheckVersionField(schema);
     idField = schema.getUniqueKeyField();
+    versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
+        Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
     buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
     for (int i=0; i<buckets.length; i++) {
-      buckets[i] = new VersionBucket();
+      buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index aad7ac0..eccc668 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -1027,16 +1027,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     if (vinfo == null) {
       if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {
-        throw new SolrException
-          (SolrException.ErrorCode.BAD_REQUEST,
-           "Atomic document updates are not supported unless <updateLog/> is configured");
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Atomic document updates are not supported unless <updateLog/> is configured");
       } else {
         super.processAdd(cmd);
         return false;
       }
     }
 
-    // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
+    // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
+    // here)
     int bucketHash = bucketHash(idBytes);
 
     // at this point, there is an update we need to try and apply.
@@ -1076,9 +1076,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     vinfo.lockForUpdate();
-    try {
-      synchronized (bucket) {
-        bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update
+    if (bucket.tryLock()) {
+      try {
+        bucket.signalAll();
+        // just in case anyone is waiting let them know that we have a new update
         // we obtain the version when synchronized and then do the add so we can ensure that
         // if version1 < version2 then version1 is actually added before version2.
 
@@ -1117,15 +1118,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             if (versionOnUpdate != 0) {
               Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
               long foundVersion = lastVersion == null ? -1 : lastVersion;
-              if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) {
+              if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
+                  || (versionOnUpdate == 1 && foundVersion > 0)) {
                 // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
                 // specified it must exist (versionOnUpdate==1) and it does.
               } else {
-                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion);
+                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+                    + " expected=" + versionOnUpdate + " actual=" + foundVersion);
               }
             }
 
-
             long version = vinfo.getNewClock();
             cmd.setVersion(version);
             cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
@@ -1148,25 +1150,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 // this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
                 // So we shouldn't be here, unless what must've happened is:
                 // by the time synchronization block was entered, the prev update was deleted by DBQ. Since
-                // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version 
-                // from the deleted list (which might be older than the prev update!) 
+                // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+                // from the deleted list (which might be older than the prev update!)
                 UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
 
                 if (fetchedFromLeader instanceof DeleteUpdateCommand) {
                   log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
                       + " was deleted at the leader subsequently.", idBytes.utf8ToString());
-                  versionDelete((DeleteUpdateCommand)fetchedFromLeader);
+                  versionDelete((DeleteUpdateCommand) fetchedFromLeader);
                   return true;
                 } else {
                   assert fetchedFromLeader instanceof AddUpdateCommand;
-                  // Newer document was fetched from the leader. Apply that document instead of this current in-place update.
-                  log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
+                  // Newer document was fetched from the leader. Apply that document instead of this current in-place
+                  // update.
+                  log.info(
+                      "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
                       idBytes.utf8ToString(), fetchedFromLeader);
 
-                  // Make this update to become a non-inplace update containing the full document obtained from the leader
-                  cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc;
+                  // Make this update to become a non-inplace update containing the full document obtained from the
+                  // leader
+                  cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
                   cmd.prevVersion = -1;
-                  cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
+                  cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
                   assert cmd.isInPlaceUpdate() == false;
                 }
               } else {
@@ -1190,11 +1195,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 // in this bucket so far, so we know that no reordering has yet occurred.
                 bucket.updateHighest(versionOnUpdate);
               } else {
-                // there have been updates higher than the current update.  we need to check
+                // there have been updates higher than the current update. we need to check
                 // the specific version for this id.
                 Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
                 if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-                  // This update is a repeat, or was reordered.  We need to drop this update.
+                  // This update is a repeat, or was reordered. We need to drop this update.
                   log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
                   return true;
                 }
@@ -1205,9 +1210,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             }
           }
         }
-        
+
         boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
-        
+
         SolrInputDocument clonedDoc = null;
         if (willDistrib && cloneRequiredOnLeader) {
           clonedDoc = cmd.solrDoc.deepCopy();
@@ -1215,16 +1220,22 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
         // TODO: possibly set checkDeleteByQueries as a flag on the command?
         doLocalAdd(cmd);
-        
+
         if (willDistrib && cloneRequiredOnLeader) {
           cmd.solrDoc = clonedDoc;
         }
+      } finally {
 
-      }  // end synchronized (bucket)
-    } finally {
-      vinfo.unlockForUpdate();
+        bucket.unlock();
+
+        vinfo.unlockForUpdate();
+      }
+      return false;
+
+    } else {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
     }
-    return false;
   }
 
   @VisibleForTesting
@@ -1253,31 +1264,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
 
     vinfo.lockForUpdate();
-    try {
-      synchronized (bucket) {
+    if (bucket.tryLock()) {
+      try {
         Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
-        lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+        lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
 
         if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
-          log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}", 
-              (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId());
+          log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+              (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
+              isReplayOrPeersync, cmd.getPrintableId());
         }
 
-        while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut())  {
-          try {
-            long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS);
-            if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that.
-              bucket.wait(timeLeft);
-            }
-          } catch (InterruptedException ie) {
-            throw new RuntimeException(ie);
-          }
+        while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+          bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
           lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
-          lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+          lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
         }
+      } finally {
+
+        bucket.unlock();
+
+        vinfo.unlockForUpdate();
       }
-    } finally {
-      vinfo.unlockForUpdate();
+    } else {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
     }
 
     if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
@@ -1810,7 +1821,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       return false;
     }
 
-    // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
+    // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
+    // here)
     int bucketHash = bucketHash(idBytes);
 
     // at this point, there is an update we need to try and apply.
@@ -1823,22 +1835,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
     }
     long signedVersionOnUpdate = versionOnUpdate;
-    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
+    versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
 
     boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
     boolean leaderLogic = isLeader && !isReplayOrPeersync;
     boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
 
-    if (!leaderLogic && versionOnUpdate==0) {
+    if (!leaderLogic && versionOnUpdate == 0) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
     }
 
     VersionBucket bucket = vinfo.bucket(bucketHash);
 
     vinfo.lockForUpdate();
-    try {
-
-      synchronized (bucket) {
+    if (bucket.tryLock()) {
+      try {
         if (versionsStored) {
           long bucketVersion = bucket.highest;
 
@@ -1864,11 +1875,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             if (signedVersionOnUpdate != 0) {
               Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
               long foundVersion = lastVersion == null ? -1 : lastVersion;
-              if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) {
+              if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
+                  || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
                 // we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
                 // specified it must exist (versionOnUpdate==1) and it does.
               } else {
-                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion);
+                throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+                    + signedVersionOnUpdate + " actual=" + foundVersion);
               }
             }
 
@@ -1891,11 +1904,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
               // in this bucket so far, so we know that no reordering has yet occured.
               bucket.updateHighest(versionOnUpdate);
             } else {
-              // there have been updates higher than the current update.  we need to check
+              // there have been updates higher than the current update. we need to check
               // the specific version for this id.
               Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
               if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
-                // This update is a repeat, or was reordered.  We need to drop this update.
+                // This update is a repeat, or was reordered. We need to drop this update.
                 log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
                 return true;
               }
@@ -1909,10 +1922,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
         doLocalDelete(cmd);
         return false;
-      }  // end synchronized (bucket)
-
-    } finally {
-      vinfo.unlockForUpdate();
+      } finally {
+        bucket.unlock();
+        vinfo.unlockForUpdate();
+      }
+    } else {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
     }
   }
 


[2/2] lucene-solr:branch_7x: SOLR-13028: @AwaitsFix AutoAddReplicasPlanActionTest#testSimple.

Posted by ma...@apache.org.
SOLR-13028: @AwaitsFix AutoAddReplicasPlanActionTest#testSimple.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/73ffc03a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/73ffc03a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/73ffc03a

Branch: refs/heads/branch_7x
Commit: 73ffc03ad0403a5fb579fecad0550000938819e3
Parents: e5329f2
Author: markrmiller <ma...@apache.org>
Authored: Thu Dec 6 08:56:03 2018 -0600
Committer: markrmiller <ma...@apache.org>
Committed: Thu Dec 6 09:28:36 2018 -0600

----------------------------------------------------------------------
 .../solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73ffc03a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
index 1c6d4a8..593225f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
@@ -77,7 +77,7 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{
   }
 
   @Test
-  @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
+  @AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13028")
   public void testSimple() throws Exception {
     JettySolrRunner jetty1 = cluster.getJettySolrRunner(0);
     JettySolrRunner jetty2 = cluster.getJettySolrRunner(1);