You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/12/06 15:28:48 UTC
[1/2] lucene-solr:branch_7x: SOLR-12833: Add configurable timeout to
VersionBucket lock.
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x ec1d05ba9 -> 73ffc03ad
SOLR-12833: Add configurable timeout to VersionBucket lock.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e5329f27
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e5329f27
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e5329f27
Branch: refs/heads/branch_7x
Commit: e5329f27c036b1017feb15651aca6dd374584308
Parents: ec1d05b
Author: markrmiller <ma...@apache.org>
Authored: Tue Dec 4 10:01:24 2018 -0600
Committer: markrmiller <ma...@apache.org>
Committed: Thu Dec 6 09:28:15 2018 -0600
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/update/VersionBucket.java | 44 +++++++
.../org/apache/solr/update/VersionInfo.java | 13 +-
.../processor/DistributedUpdateProcessor.java | 132 +++++++++++--------
4 files changed, 131 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 778991c..4de1651 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -65,6 +65,8 @@ Improvements
* SOLR-12804: Remove static modifier from Overseer queue access. (Mark Miller)
+* SOLR-12833: Add configurable timeout to VersionBucket lock. (Jeffery Yuan, Mark Miller)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/VersionBucket.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index c0c2826..066936a 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -16,12 +16,26 @@
*/
package org.apache.solr.update;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
// TODO: make inner?
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
// TODO: could also store highest possible in the transaction log after a commit.
// Or on a new index, just scan "version" for the max?
/** @lucene.internal */
public class VersionBucket {
+ private int lockTimeoutMs;
+
+ public VersionBucket(int lockTimeoutMs) {
+ this.lockTimeoutMs = lockTimeoutMs;
+ }
+
+ private final Lock lock = new ReentrantLock(true);
+ private final Condition condition = lock.newCondition();
+
public long highest;
public void updateHighest(long val) {
@@ -29,4 +43,34 @@ public class VersionBucket {
highest = Math.max(highest, Math.abs(val));
}
}
+
+ public int getLockTimeoutMs() {
+ return lockTimeoutMs;
+ }
+
+ public boolean tryLock() {
+ try {
+ return lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void unlock() {
+ lock.unlock();
+ }
+
+ public void signalAll() {
+ condition.signalAll();
+ }
+
+ public void awaitNanos(long nanosTimeout) {
+ try {
+ condition.awaitNanos(nanosTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/VersionInfo.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index 7697be4..dd39043 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -45,8 +45,13 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
public class VersionInfo {
-
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS = "bucketVersionLockTimeoutMs";
+
+ /**
+ * same as default client read timeout: 10 mins
+ */
+ private static final int DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS = 600000;
private final UpdateLog ulog;
private final VersionBucket[] buckets;
@@ -54,6 +59,8 @@ public class VersionInfo {
private SchemaField idField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private int versionBucketLockTimeoutMs;
+
/**
* Gets and returns the {@link org.apache.solr.common.params.CommonParams#VERSION_FIELD} from the specified
* schema, after verifying that it is indexed, stored, and single-valued.
@@ -94,9 +101,11 @@ public class VersionInfo {
IndexSchema schema = ulog.uhandler.core.getLatestSchema();
versionField = getAndCheckVersionField(schema);
idField = schema.getUniqueKeyField();
+ versionBucketLockTimeoutMs = ulog.uhandler.core.getSolrConfig().getInt("updateHandler/versionBucketLockTimeoutMs",
+ Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "" + DEFAULT_VERSION_BUCKET_LOCK_TIMEOUT_MS)));
buckets = new VersionBucket[ BitUtil.nextHighestPowerOfTwo(nBuckets) ];
for (int i=0; i<buckets.length; i++) {
- buckets[i] = new VersionBucket();
+ buckets[i] = new VersionBucket(versionBucketLockTimeoutMs);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5329f27/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index aad7ac0..eccc668 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -1027,16 +1027,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (vinfo == null) {
if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {
- throw new SolrException
- (SolrException.ErrorCode.BAD_REQUEST,
- "Atomic document updates are not supported unless <updateLog/> is configured");
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Atomic document updates are not supported unless <updateLog/> is configured");
} else {
super.processAdd(cmd);
return false;
}
}
- // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
+ // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
+ // here)
int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
@@ -1076,9 +1076,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
vinfo.lockForUpdate();
- try {
- synchronized (bucket) {
- bucket.notifyAll(); //just in case anyone is waiting let them know that we have a new update
+ if (bucket.tryLock()) {
+ try {
+ bucket.signalAll();
+ // just in case anyone is waiting let them know that we have a new update
// we obtain the version when synchronized and then do the add so we can ensure that
// if version1 < version2 then version1 is actually added before version2.
@@ -1117,15 +1118,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (versionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
- if ( versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0) || (versionOnUpdate==1 && foundVersion > 0) ) {
+ if (versionOnUpdate == foundVersion || (versionOnUpdate < 0 && foundVersion < 0)
+ || (versionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
- throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId() + " expected=" + versionOnUpdate + " actual=" + foundVersion);
+ throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getPrintableId()
+ + " expected=" + versionOnUpdate + " actual=" + foundVersion);
}
}
-
long version = vinfo.getNewClock();
cmd.setVersion(version);
cmd.getSolrInputDocument().setField(CommonParams.VERSION_FIELD, version);
@@ -1148,25 +1150,28 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this was checked for (in waitForDependentUpdates()) before entering the synchronized block.
// So we shouldn't be here, unless what must've happened is:
// by the time synchronization block was entered, the prev update was deleted by DBQ. Since
- // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
- // from the deleted list (which might be older than the prev update!)
+ // now that update is not in index, the vinfo.lookupVersion() is possibly giving us a version
+ // from the deleted list (which might be older than the prev update!)
UpdateCommand fetchedFromLeader = fetchFullUpdateFromLeader(cmd, versionOnUpdate);
if (fetchedFromLeader instanceof DeleteUpdateCommand) {
log.info("In-place update of {} failed to find valid lastVersion to apply to, and the document"
+ " was deleted at the leader subsequently.", idBytes.utf8ToString());
- versionDelete((DeleteUpdateCommand)fetchedFromLeader);
+ versionDelete((DeleteUpdateCommand) fetchedFromLeader);
return true;
} else {
assert fetchedFromLeader instanceof AddUpdateCommand;
- // Newer document was fetched from the leader. Apply that document instead of this current in-place update.
- log.info("In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
+ // Newer document was fetched from the leader. Apply that document instead of this current in-place
+ // update.
+ log.info(
+ "In-place update of {} failed to find valid lastVersion to apply to, forced to fetch full doc from leader: {}",
idBytes.utf8ToString(), fetchedFromLeader);
- // Make this update to become a non-inplace update containing the full document obtained from the leader
- cmd.solrDoc = ((AddUpdateCommand)fetchedFromLeader).solrDoc;
+ // Make this update to become a non-inplace update containing the full document obtained from the
+ // leader
+ cmd.solrDoc = ((AddUpdateCommand) fetchedFromLeader).solrDoc;
cmd.prevVersion = -1;
- cmd.setVersion((long)cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
+ cmd.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
assert cmd.isInPlaceUpdate() == false;
}
} else {
@@ -1190,11 +1195,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// in this bucket so far, so we know that no reordering has yet occurred.
bucket.updateHighest(versionOnUpdate);
} else {
- // there have been updates higher than the current update. we need to check
+ // there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
+ // This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
return true;
}
@@ -1205,9 +1210,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
-
+
boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
-
+
SolrInputDocument clonedDoc = null;
if (willDistrib && cloneRequiredOnLeader) {
clonedDoc = cmd.solrDoc.deepCopy();
@@ -1215,16 +1220,22 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
-
+
if (willDistrib && cloneRequiredOnLeader) {
cmd.solrDoc = clonedDoc;
}
+ } finally {
- } // end synchronized (bucket)
- } finally {
- vinfo.unlockForUpdate();
+ bucket.unlock();
+
+ vinfo.unlockForUpdate();
+ }
+ return false;
+
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
}
- return false;
}
@VisibleForTesting
@@ -1253,31 +1264,31 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
vinfo.lockForUpdate();
- try {
- synchronized (bucket) {
+ if (bucket.tryLock()) {
+ try {
Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
- lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+ lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
- log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
- (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion, isReplayOrPeersync, cmd.getPrintableId());
+ log.debug("Re-ordered inplace update. version={}, prevVersion={}, lastVersion={}, replayOrPeerSync={}, id={}",
+ (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()), cmd.prevVersion, lastFoundVersion,
+ isReplayOrPeersync, cmd.getPrintableId());
}
- while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
- try {
- long timeLeft = waitTimeout.timeLeft(TimeUnit.MILLISECONDS);
- if (timeLeft > 0) { // wait(0) waits forever until notified, but we don't want that.
- bucket.wait(timeLeft);
- }
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
+ while (Math.abs(lastFoundVersion) < cmd.prevVersion && !waitTimeout.hasTimedOut()) {
+ bucket.awaitNanos(waitTimeout.timeLeft(TimeUnit.NANOSECONDS));
lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
- lastFoundVersion = lookedUpVersion == null ? 0L: lookedUpVersion;
+ lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
}
+ } finally {
+
+ bucket.unlock();
+
+ vinfo.unlockForUpdate();
}
- } finally {
- vinfo.unlockForUpdate();
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
}
if (Math.abs(lastFoundVersion) > cmd.prevVersion) {
@@ -1810,7 +1821,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return false;
}
- // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash here)
+ // This is only the hash for the bucket, and must be based only on the uniqueKey (i.e. do not use a pluggable hash
+ // here)
int bucketHash = bucketHash(idBytes);
// at this point, there is an update we need to try and apply.
@@ -1823,22 +1835,21 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
long signedVersionOnUpdate = versionOnUpdate;
- versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
+ versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
boolean forwardedFromCollection = cmd.getReq().getParams().get(DISTRIB_FROM_COLLECTION) != null;
- if (!leaderLogic && versionOnUpdate==0) {
+ if (!leaderLogic && versionOnUpdate == 0) {
throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
}
VersionBucket bucket = vinfo.bucket(bucketHash);
vinfo.lockForUpdate();
- try {
-
- synchronized (bucket) {
+ if (bucket.tryLock()) {
+ try {
if (versionsStored) {
long bucketVersion = bucket.highest;
@@ -1864,11 +1875,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (signedVersionOnUpdate != 0) {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
long foundVersion = lastVersion == null ? -1 : lastVersion;
- if ( (signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0) || (signedVersionOnUpdate == 1 && foundVersion > 0) ) {
+ if ((signedVersionOnUpdate == foundVersion) || (signedVersionOnUpdate < 0 && foundVersion < 0)
+ || (signedVersionOnUpdate == 1 && foundVersion > 0)) {
// we're ok if versions match, or if both are negative (all missing docs are equal), or if cmd
// specified it must exist (versionOnUpdate==1) and it does.
} else {
- throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected=" + signedVersionOnUpdate + " actual=" + foundVersion);
+ throw new SolrException(ErrorCode.CONFLICT, "version conflict for " + cmd.getId() + " expected="
+ + signedVersionOnUpdate + " actual=" + foundVersion);
}
}
@@ -1891,11 +1904,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// in this bucket so far, so we know that no reordering has yet occured.
bucket.updateHighest(versionOnUpdate);
} else {
- // there have been updates higher than the current update. we need to check
+ // there have been updates higher than the current update. we need to check
// the specific version for this id.
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
- // This update is a repeat, or was reordered. We need to drop this update.
+ // This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
return true;
}
@@ -1909,10 +1922,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
doLocalDelete(cmd);
return false;
- } // end synchronized (bucket)
-
- } finally {
- vinfo.unlockForUpdate();
+ } finally {
+ bucket.unlock();
+ vinfo.unlockForUpdate();
+ }
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to get version bucket lock in " + bucket.getLockTimeoutMs() + " ms");
}
}
[2/2] lucene-solr:branch_7x: SOLR-13028: @AwaitsFix
AutoAddReplicasPlanActionTest#testSimple.
Posted by ma...@apache.org.
SOLR-13028: @AwaitsFix AutoAddReplicasPlanActionTest#testSimple.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/73ffc03a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/73ffc03a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/73ffc03a
Branch: refs/heads/branch_7x
Commit: 73ffc03ad0403a5fb579fecad0550000938819e3
Parents: e5329f2
Author: markrmiller <ma...@apache.org>
Authored: Thu Dec 6 08:56:03 2018 -0600
Committer: markrmiller <ma...@apache.org>
Committed: Thu Dec 6 09:28:36 2018 -0600
----------------------------------------------------------------------
.../solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/73ffc03a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
index 1c6d4a8..593225f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
@@ -77,7 +77,7 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{
}
@Test
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
+ @AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13028")
public void testSimple() throws Exception {
JettySolrRunner jetty1 = cluster.getJettySolrRunner(0);
JettySolrRunner jetty2 = cluster.getJettySolrRunner(1);