You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gc...@apache.org on 2015/10/14 01:21:13 UTC

svn commit: r1708538 - in /lucene/dev/branches/branch_5x/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java

Author: gchanan
Date: Tue Oct 13 23:21:13 2015
New Revision: 1708538

URL: http://svn.apache.org/viewvc?rev=1708538&view=rev
Log:
SOLR-8152: Overseer Task Processor/Queue can miss responses, leading to timeouts

Modified:
    lucene/dev/branches/branch_5x/solr/CHANGES.txt
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1708538&r1=1708537&r2=1708538&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Tue Oct 13 23:21:13 2015
@@ -157,6 +157,9 @@ Bug Fixes
 
 * SOLR-8128: Set v.locale specified locale for all LocaleConfig extending VelocityResponseWriter tools.
   (Erik Hatcher)
+
+* SOLR-8152: Overseer Task Processor/Queue can miss responses, leading to timeouts.
+  (Gregory Chanan)
   
 Optimizations
 ----------------------

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java?rev=1708538&r1=1708537&r2=1708538&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java Tue Oct 13 23:21:13 2015
@@ -29,6 +29,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +92,9 @@ public class OverseerTaskQueue extends D
           + path.substring(path.lastIndexOf("-") + 1);
       if (zookeeper.exists(responsePath, true)) {
         zookeeper.setData(responsePath, event.getBytes(), true);
+      } else {
+        LOG.info("Response ZK path: " + responsePath + " doesn't exist."
+            + "  Requestor may have disconnected from ZooKeeper");
       }
       byte[] data = zookeeper.getData(path, null, null, true);
       zookeeper.delete(path, -1, true);
@@ -127,8 +131,8 @@ public class OverseerTaskQueue extends D
       Event.EventType eventType = event.getType();
       // None events are ignored
       // If latchEventType is not null, only fire if the type matches
+      LOG.info("{} fired on path {} state {} latchEventType {}", eventType, event.getPath(), event.getState(), latchEventType);
       if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
-        LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
         synchronized (lock) {
           this.event = event;
           lock.notifyAll();
@@ -176,22 +180,31 @@ public class OverseerTaskQueue extends D
       InterruptedException {
     TimerContext time = stats.time(dir + "_offer");
     try {
-      String path = createData(dir + "/" + PREFIX, data,
-          CreateMode.PERSISTENT_SEQUENTIAL);
+      // Create and watch the response node before creating the request node;
+      // otherwise we may miss the response.
       String watchID = createData(
-          dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
-          null, CreateMode.EPHEMERAL);
+          dir + "/" + response_prefix,
+          null, CreateMode.EPHEMERAL_SEQUENTIAL);
 
       Object lock = new Object();
       LatchWatcher watcher = new LatchWatcher(lock);
+      Stat stat = zookeeper.exists(watchID, watcher, true);
+
+      // create the request node
+      createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
+          data, CreateMode.PERSISTENT);
+
       synchronized (lock) {
-        if (zookeeper.exists(watchID, watcher, true) != null) {
+        if (stat != null && watcher.getWatchedEvent() == null) {
           watcher.await(timeout);
         }
       }
       byte[] bytes = zookeeper.getData(watchID, null, null, true);
+      // create the event before deleting the node, otherwise we can get the deleted
+      // event from the watcher.
+      QueueEvent event =  new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
       zookeeper.delete(watchID, -1, true);
-      return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+      return event;
     } finally {
       time.stop();
     }