You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/10/30 19:19:47 UTC

svn commit: r1635573 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/DistributedQueue.java core/src/test/org/apache/solr/cloud/DistributedQueueTest.java

Author: thelabdude
Date: Thu Oct 30 18:19:46 2014
New Revision: 1635573

URL: http://svn.apache.org/r1635573
Log:
SOLR-6631: Part deux - refactor LatchChildWatcher to LatchWatcher that takes an optional EventType during construction to only release the latch when a specific event type is received.

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1635573&r1=1635572&r2=1635573&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Oct 30 18:19:46 2014
@@ -278,6 +278,10 @@ Bug Fixes
 * SOLR-6591: Overseer can use stale cluster state and lose updates for collections
   with stateFormat > 1. (shalin)
 
+* SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
+  (Jessica Cheng Mallet, Mark Miller, Timothy Potter)
+
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1635573&r1=1635572&r2=1635573&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Thu Oct 30 18:19:46 2014
@@ -43,8 +43,7 @@ import org.slf4j.LoggerFactory;
  * A distributed queue from zk recipes.
  */
 public class DistributedQueue {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(DistributedQueue.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
   
   private static long DEFAULT_TIMEOUT = 5*60*1000;
   
@@ -229,38 +228,50 @@ public class DistributedQueue {
       time.stop();
     }
   }
-  
-  
-  private class LatchChildWatcher implements Watcher {
-    
-    final Object lock;
-    private WatchedEvent event = null;
-    
-    public LatchChildWatcher() {
-      this.lock = new Object();
+
+  /**
+   * Watcher that blocks until a WatchedEvent occurs for a znode.
+   */
+  private final class LatchWatcher implements Watcher {
+
+    private final Object lock;
+    private WatchedEvent event;
+    private Event.EventType latchEventType;
+
+    LatchWatcher(Object lock) {
+      this(lock, null);
     }
 
-    public LatchChildWatcher(Object lock) {
+    LatchWatcher(Event.EventType eventType) {
+      this(new Object(), eventType);
+    }
+
+    LatchWatcher(Object lock, Event.EventType eventType) {
       this.lock = lock;
+      this.latchEventType = eventType;
     }
-    
+
     @Override
     public void process(WatchedEvent event) {
-      LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
-          + event.getState() + " type " + event.getType());
-      synchronized (lock) {
-        this.event = event;
-        lock.notifyAll();
+      Event.EventType eventType = event.getType();
+      // None events are ignored
+      // If latchEventType is not null, only fire if the type matches
+      if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
+        LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
+        synchronized (lock) {
+          this.event = event;
+          lock.notifyAll();
+        }
       }
     }
-    
+
     public void await(long timeout) throws InterruptedException {
       synchronized (lock) {
         if (this.event != null) return;
         lock.wait(timeout);
       }
     }
-    
+
     public WatchedEvent getWatchedEvent() {
       return event;
     }
@@ -268,13 +279,13 @@ public class DistributedQueue {
 
   // we avoid creating *many* watches in some cases
   // by saving the childrenWatcher and the children associated - see SOLR-6336
-  private LatchChildWatcher childrenWatcher;
+  private LatchWatcher childrenWatcher;
   private TreeMap<Long,String> fetchedChildren;
   private final Object childrenWatcherLock = new Object();
 
   private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
   {
-    LatchChildWatcher watcher;
+    LatchWatcher watcher;
     TreeMap<Long,String> children;
     synchronized (childrenWatcherLock) {
       watcher = childrenWatcher;
@@ -282,7 +293,8 @@ public class DistributedQueue {
     }
 
     if (watcher == null ||  watcher.getWatchedEvent() != null) {
-      watcher = new LatchChildWatcher();
+      // this watcher is only interested in child change events
+      watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
       while (true) {
         try {
           children = orderedChildren(watcher);
@@ -384,8 +396,9 @@ public class DistributedQueue {
       String watchID = createData(
           dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
           null, CreateMode.EPHEMERAL);
+
       Object lock = new Object();
-      LatchChildWatcher watcher = new LatchChildWatcher(lock);
+      LatchWatcher watcher = new LatchWatcher(lock);
       synchronized (lock) {
         if (zookeeper.exists(watchID, watcher, true) != null) {
           watcher.await(timeout);

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java?rev=1635573&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java Thu Oct 30 18:19:46 2014
@@ -0,0 +1,139 @@
+package org.apache.solr.cloud;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.nio.charset.Charset;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DistributedQueueTest extends SolrTestCaseJ4 {
+
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+
+  protected ZkTestServer zkServer;
+  protected SolrZkClient zkClient;
+
+  @Before
+  public void beforeClass() {
+    System.setProperty("solr.solrxml.location", "zookeeper");
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    setupZk();
+  }
+
+  @Test
+  public void testDistributedQueue() throws Exception {
+    String dqZNode = "/distqueue/test";
+    String testData = "hello world";
+    long timeoutMs = 500L;
+
+    DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
+
+    // basic ops
+    assertTrue(dq.poll() == null);
+    byte[] data = testData.getBytes(UTF8);
+    dq.offer(data);
+    assertEquals(new String(dq.peek(),UTF8), testData);
+    assertEquals(new String(dq.take(),UTF8), testData);
+    assertTrue(dq.poll() == null);
+    QueueEvent qe = dq.offer(data, timeoutMs);
+    assertNotNull(qe);
+    assertEquals(new String(dq.remove(),UTF8), testData);
+
+    // should block until the background thread makes the offer
+    (new QueueChangerThread(dq, 1000)).start();
+    qe = dq.peek(true);
+    assertNotNull(qe);
+    dq.remove();
+
+    // timeout scenario ... background thread won't offer until long after the peek times out
+    QueueChangerThread qct = new QueueChangerThread(dq, 1000);
+    qct.start();
+    qe = dq.peek(500);
+    assertTrue(qe == null);
+
+    try {
+      qct.interrupt();
+    } catch (Exception exc) {}
+  }
+
+  private class QueueChangerThread extends Thread {
+
+    DistributedQueue dq;
+    long waitBeforeOfferMs;
+
+    QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
+      this.dq = dq;
+      this.waitBeforeOfferMs = waitBeforeOfferMs;
+    }
+
+    public void run() {
+      try {
+        Thread.sleep(waitBeforeOfferMs);
+        dq.offer(getName().getBytes(UTF8));
+      } catch (InterruptedException ie) {
+        // do nothing
+      } catch (Exception exc) {
+        throw new RuntimeException(exc);
+      }
+    }
+  }
+
+  protected String setupDistributedQueueZNode(String znodePath) throws Exception {
+    if (!zkClient.exists("/", true))
+      zkClient.makePath("/", false, true);
+    if (zkClient.exists(znodePath, true))
+      zkClient.clean(znodePath);
+    zkClient.makePath(znodePath, false, true);
+    return znodePath;
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    try {
+      super.tearDown();
+    } catch (Exception exc) {}
+    closeZk();
+  }
+
+  protected void setupZk() throws Exception {
+    System.setProperty("zkClientTimeout", "8000");
+    zkServer = new ZkTestServer(createTempDir("zkData").toFile().getAbsolutePath());
+    zkServer.run();
+    System.setProperty("zkHost", zkServer.getZkAddress());
+    zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
+    assertTrue(zkClient.isConnected());
+  }
+
+  protected void closeZk() throws Exception {
+    if (zkClient != null)
+      zkClient.close();
+    zkServer.shutdown();
+  }
+}