You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2018/01/26 18:36:41 UTC

lucene-solr:master: SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup

Repository: lucene-solr
Updated Branches:
  refs/heads/master 3856ae2d8 -> 56f3f6d94


SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/56f3f6d9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/56f3f6d9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/56f3f6d9

Branch: refs/heads/master
Commit: 56f3f6d9484dd353ac50d47717c872ca9dac16ea
Parents: 3856ae2
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Fri Jan 26 10:33:10 2018 -0800
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Fri Jan 26 10:36:12 2018 -0800

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../apache/solr/cloud/OverseerTaskQueue.java    | 61 ++++++++++++--------
 .../org/apache/solr/cloud/OverseerTest.java     | 60 ++++++++++++++++---
 3 files changed, 89 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a1725e7..f622374 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -108,6 +108,8 @@ New Features
 * SOLR-11722: New CREATEROUTEDALIAS SolrCloud command to create a "time routed alias" over a series of collections
   partitioned by time. (Gus Heck, David Smiley)
 
+* SOLR-11782: Refactor LatchWatcher.await to protect against spurious wakeup (Tomás Fernández Löbbe, David Smiley, Dawid Weiss)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 4a6ea8a..3687369 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -16,13 +16,16 @@
  */
 package org.apache.solr.cloud;
 
+import com.codahale.metrics.Timer;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
-
-import com.codahale.metrics.Timer;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.util.Pair;
@@ -108,25 +111,24 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
   /**
    * Watcher that blocks until a WatchedEvent occurs for a znode.
    */
-  private static final class LatchWatcher implements Watcher {
+  static final class LatchWatcher implements Watcher {
 
-    private final Object lock;
+    private final Lock lock;
+    private final Condition eventReceived;
     private WatchedEvent event;
     private Event.EventType latchEventType;
-
-    LatchWatcher(Object lock) {
-      this(lock, null);
+    
+    LatchWatcher() {
+      this(null);
     }
-
+    
     LatchWatcher(Event.EventType eventType) {
-      this(new Object(), eventType);
-    }
-
-    LatchWatcher(Object lock, Event.EventType eventType) {
-      this.lock = lock;
+      this.lock = new ReentrantLock();
+      this.eventReceived = lock.newCondition();
       this.latchEventType = eventType;
     }
 
+
     @Override
     public void process(WatchedEvent event) {
       // session events are not change events, and do not remove the watcher
@@ -136,17 +138,29 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       // If latchEventType is not null, only fire if the type matches
       LOG.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
       if (latchEventType == null || event.getType() == latchEventType) {
-        synchronized (lock) {
+        lock.lock();
+        try {
           this.event = event;
-          lock.notifyAll();
+          eventReceived.signalAll();
+        } finally {
+          lock.unlock();
         }
       }
     }
 
-    public void await(long timeout) throws InterruptedException {
-      synchronized (lock) {
-        if (this.event != null) return;
-        lock.wait(timeout);
+    public void await(long timeoutMs) throws InterruptedException {
+      assert timeoutMs > 0;
+      long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
+      lock.lock();
+      try {
+        while (this.event == null) {
+          if (timeoutNanos <= 0) {
+            return;
+          }
+          timeoutNanos = eventReceived.awaitNanos(timeoutNanos);
+        }
+      } finally {
+        lock.unlock();
       }
     }
 
@@ -187,17 +201,14 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
       // otherwise we may miss the response.
       String watchID = createResponseNode();
 
-      Object lock = new Object();
-      LatchWatcher watcher = new LatchWatcher(lock);
+      LatchWatcher watcher = new LatchWatcher();
       Stat stat = zookeeper.exists(watchID, watcher, true);
 
       // create the request node
       createRequestNode(data, watchID);
 
-      synchronized (lock) {
-        if (stat != null && watcher.getWatchedEvent() == null) {
-          watcher.await(timeout);
-        }
+      if (stat != null) {
+        watcher.await(timeout);
       }
       byte[] bytes = zookeeper.getData(watchID, null, null, true);
       // create the event before deleting the node, otherwise we can get the deleted

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/56f3f6d9/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 7259d38..3b46922 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -16,7 +16,13 @@
  */
 package org.apache.solr.cloud;
 
-import javax.xml.parsers.ParserConfigurationException;
+import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
 import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -31,10 +37,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
+import javax.xml.parsers.ParserConfigurationException;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.cloud.DistributedQueue;
@@ -64,7 +69,10 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.WatcherEvent;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -75,11 +83,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
-import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 @Slow
 public class OverseerTest extends SolrTestCaseJ4 {
 
@@ -1475,5 +1478,44 @@ public class OverseerTest extends SolrTestCaseJ4 {
       server.shutdown();
     }
   }
+  
+  @Test
+  public void testLatchWatcher() throws InterruptedException {
+    OverseerTaskQueue.LatchWatcher latch1 = new OverseerTaskQueue.LatchWatcher();
+    long before = System.nanoTime();
+    latch1.await(100);
+    long after = System.nanoTime();
+    assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) > 50);
+    assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 500);// Mostly to make sure the millis->nanos->millis is not broken
+    latch1.process(new WatchedEvent(new WatcherEvent(1, 1, "/foo/bar")));
+    before = System.nanoTime();
+    latch1.await(10000);// Expecting no wait
+    after = System.nanoTime();
+    assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
+    
+    final AtomicBoolean expectedEventProcessed = new AtomicBoolean(false);
+    final AtomicBoolean doneWaiting = new AtomicBoolean(false);
+    final OverseerTaskQueue.LatchWatcher latch2 = new OverseerTaskQueue.LatchWatcher(Event.EventType.NodeCreated);
+    Thread t = new Thread(()->{
+      //Process an event of a different type first, this shouldn't release the latch
+      latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeDeleted.getIntValue(), 1, "/foo/bar")));
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      assertFalse("Latch shouldn't have been released", doneWaiting.get());
+      // Now process the correct type of event
+      expectedEventProcessed.set(true);
+      latch2.process(new WatchedEvent(new WatcherEvent(Event.EventType.NodeCreated.getIntValue(), 1, "/foo/bar")));
+    });
+    t.start();
+    before = System.nanoTime();
+    latch2.await(10000); // It shouldn't wait this long, t should notify the lock
+    after = System.nanoTime();
+    doneWaiting.set(true);
+    assertTrue(expectedEventProcessed.get());
+    assertTrue(TimeUnit.NANOSECONDS.toMillis(after-before) < 1000);
+  }
 
 }