You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2018/01/26 18:36:41 UTC
lucene-solr:master: SOLR-11782: Refactor LatchWatcher.await to
protect against spurious wakeup
Repository: lucene-solr
Updated Branches:
refs/heads/master 3856ae2d8 -> 56f3f6d94
SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/56f3f6d9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/56f3f6d9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/56f3f6d9
Branch: refs/heads/master
Commit: 56f3f6d9484dd353ac50d47717c872ca9dac16ea
Parents: 3856ae2
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Fri Jan 26 10:33:10 2018 -0800
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Fri Jan 26 10:36:12 2018 -0800
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../apache/solr/cloud/OverseerTaskQueue.java | 61 ++++++++++++--------
.../org/apache/solr/cloud/OverseerTest.java | 60 ++++++++++++++++---
3 files changed, 89 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a1725e7..f622374 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -108,6 +108,8 @@ New Features
* SOLR-11722: New CREATEROUTEDALIAS SolrCloud command to create a "time routed alias" over a series of collections
partitioned by time. (Gus Heck, David Smiley)
+* SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup (Tomás Fernández Löbbe, David Smiley, Dawid Weiss)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 4a6ea8a..3687369 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -16,13 +16,16 @@
*/
package org.apache.solr.cloud;
+import com.codahale.metrics.Timer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
-
-import com.codahale.metrics.Timer;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.Pair;
@@ -108,25 +111,24 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
/**
* Watcher that blocks until a WatchedEvent occurs for a znode.
*/
- private static final class LatchWatcher implements Watcher {
+ static final class LatchWatcher implements Watcher {
- private final Object lock;
+ private final Lock lock;
+ private final Condition eventReceived;
private WatchedEvent event;
private Event.EventType latchEventType;
-
- LatchWatcher(Object lock) {
- this(lock, null);
+
+ LatchWatcher() {
+ this(null);
}
-
+
LatchWatcher(Event.EventType eventType) {
- this(new Object(), eventType);
- }
-
- LatchWatcher(Object lock, Event.EventType eventType) {
- this.lock = lock;
+ this.lock = new ReentrantLock();
+ this.eventReceived = lock.newCondition();
this.latchEventType = eventType;
}
+
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher
@@ -136,17 +138,29 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
// If latchEventType is not null, only fire if the type matches
LOG.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
if (latchEventType == null || event.getType() == latchEventType) {
- synchronized (lock) {
+ lock.lock();
+ try {
this.event = event;
- lock.notifyAll();
+ eventReceived.signalAll();
+ } finally {
+ lock.unlock();
}
}
}
- public void await(long timeout) throws InterruptedException {
- synchronized (lock) {
- if (this.event != null) return;
- lock.wait(timeout);
+ public void await(long timeoutMs) throws InterruptedException {
+ assert timeoutMs > 0;
+ long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
+ lock.lock();
+ try {
+ while (this.event == null) {
+ if (timeoutNanos <= 0) {
+ return;
+ }
+ timeoutNanos = eventReceived.awaitNanos(timeoutNanos);
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -187,17 +201,14 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
// otherwise we may miss the response.
String watchID = createResponseNode();
- Object lock = new Object();
- LatchWatcher watcher = new LatchWatcher(lock);
+ LatchWatcher watcher = new LatchWatcher();
Stat stat = zookeeper.exists(watchID, watcher, true);
// create the request node
createRequestNode(data, watchID);
- synchronized (lock) {
- if (stat != null && watcher.getWatchedEvent() == null) {
- watcher.await(timeout);
- }
+ if (stat != null) {
+ watcher.await(timeout);
}
byte[] bytes = zookeeper.getData(watchID, null, null, true);
// create the event before deleting the node, otherwise we can get the deleted
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 7259d38..3b46922 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -16,7 +16,13 @@
*/
package org.apache.solr.cloud;
-import javax.xml.parsers.ParserConfigurationException;
+import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -31,10 +37,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
+import javax.xml.parsers.ParserConfigurationException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
@@ -64,7 +69,10 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.WatcherEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -75,11 +83,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
-import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
@Slow
public class OverseerTest extends SolrTestCaseJ4 {
@@ -1475,5 +1478,44 @@ public class OverseerTest extends SolrTestCaseJ4 {
server.shutdown();
}
}
+
+ @Test
+ public void testLatchWatcher() throws InterruptedException {
+ OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher();
+ long before = System.nanoTime();
+ latch1.await(100);
+ long after = System.nanoTime();
+ assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50);
+ assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken
+ latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar")));
+ before = System.nanoTime();
+ latch1.await(10000);// Expecting no wait
+ after = System.nanoTime();
+ assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
+
+ final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false);
+ final AtomicBoolean doneWaiting = new AtomicBoolean(false);
+ final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated);
+ Thread t = new Thread(()->{
+ //Process an event of a different type first, this shouldn't release the latch
+ latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar")));
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse("Latch shouldn't have been released", doneWaiting.get());
+ // Now process the correct type of event
+ expectedEventProcessed.set(true);
+ latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar")));
+ });
+ t.start();
+ before = System.nanoTime();
+ latch2.await(10000); // It shouldn't wait this long, t should notify the lock
+ after = System.nanoTime();
+ doneWaiting.set(true);
+ assertTrue(expectedEventProcessed.get());
+ assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
+ }
}