You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/10/30 19:19:47 UTC
svn commit: r1635573 - in /lucene/dev/trunk/solr: CHANGES.txt
core/src/java/org/apache/solr/cloud/DistributedQueue.java
core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
Author: thelabdude
Date: Thu Oct 30 18:19:46 2014
New Revision: 1635573
URL: http://svn.apache.org/r1635573
Log:
SOLR-6631: Part deux - refactor LatchChildWatcher to LatchWatcher that takes an optional EventType during construction to only release the latch when a specific event type is received.
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1635573&r1=1635572&r2=1635573&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Oct 30 18:19:46 2014
@@ -278,6 +278,10 @@ Bug Fixes
* SOLR-6591: Overseer can use stale cluster state and lose updates for collections
with stateFormat > 1. (shalin)
+* SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
+ (Jessica Cheng Mallet, Mark Miller, Timothy Potter)
+
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1635573&r1=1635572&r2=1635573&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Thu Oct 30 18:19:46 2014
@@ -43,8 +43,7 @@ import org.slf4j.LoggerFactory;
* A distributed queue from zk recipes.
*/
public class DistributedQueue {
- private static final Logger LOG = LoggerFactory
- .getLogger(DistributedQueue.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
private static long DEFAULT_TIMEOUT = 5*60*1000;
@@ -229,38 +228,50 @@ public class DistributedQueue {
time.stop();
}
}
-
-
- private class LatchChildWatcher implements Watcher {
-
- final Object lock;
- private WatchedEvent event = null;
-
- public LatchChildWatcher() {
- this.lock = new Object();
+
+ /**
+ * Watcher that blocks until a WatchedEvent occurs for a znode.
+ */
+ private final class LatchWatcher implements Watcher {
+
+ private final Object lock;
+ private WatchedEvent event;
+ private Event.EventType latchEventType;
+
+ LatchWatcher(Object lock) {
+ this(lock, null);
}
- public LatchChildWatcher(Object lock) {
+ LatchWatcher(Event.EventType eventType) {
+ this(new Object(), eventType);
+ }
+
+ LatchWatcher(Object lock, Event.EventType eventType) {
this.lock = lock;
+ this.latchEventType = eventType;
}
-
+
@Override
public void process(WatchedEvent event) {
- LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
- + event.getState() + " type " + event.getType());
- synchronized (lock) {
- this.event = event;
- lock.notifyAll();
+ Event.EventType eventType = event.getType();
+ // None events are ignored
+ // If latchEventType is not null, only fire if the type matches
+ if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
+ LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
+ synchronized (lock) {
+ this.event = event;
+ lock.notifyAll();
+ }
}
}
-
+
public void await(long timeout) throws InterruptedException {
synchronized (lock) {
if (this.event != null) return;
lock.wait(timeout);
}
}
-
+
public WatchedEvent getWatchedEvent() {
return event;
}
@@ -268,13 +279,13 @@ public class DistributedQueue {
// we avoid creating *many* watches in some cases
// by saving the childrenWatcher and the children associated - see SOLR-6336
- private LatchChildWatcher childrenWatcher;
+ private LatchWatcher childrenWatcher;
private TreeMap<Long,String> fetchedChildren;
private final Object childrenWatcherLock = new Object();
private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
{
- LatchChildWatcher watcher;
+ LatchWatcher watcher;
TreeMap<Long,String> children;
synchronized (childrenWatcherLock) {
watcher = childrenWatcher;
@@ -282,7 +293,8 @@ public class DistributedQueue {
}
if (watcher == null || watcher.getWatchedEvent() != null) {
- watcher = new LatchChildWatcher();
+ // this watcher is only interested in child change events
+ watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
while (true) {
try {
children = orderedChildren(watcher);
@@ -384,8 +396,9 @@ public class DistributedQueue {
String watchID = createData(
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
null, CreateMode.EPHEMERAL);
+
Object lock = new Object();
- LatchChildWatcher watcher = new LatchChildWatcher(lock);
+ LatchWatcher watcher = new LatchWatcher(lock);
synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java?rev=1635573&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java Thu Oct 30 18:19:46 2014
@@ -0,0 +1,139 @@
+package org.apache.solr.cloud;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.nio.charset.Charset;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DistributedQueueTest extends SolrTestCaseJ4 {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ protected ZkTestServer zkServer;
+ protected SolrZkClient zkClient;
+
+ @Before
+ public void beforeClass() {
+ System.setProperty("solr.solrxml.location", "zookeeper");
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ setupZk();
+ }
+
+ @Test
+ public void testDistributedQueue() throws Exception {
+ String dqZNode = "/distqueue/test";
+ String testData = "hello world";
+ long timeoutMs = 500L;
+
+ DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
+
+ // basic ops
+ assertTrue(dq.poll() == null);
+ byte[] data = testData.getBytes(UTF8);
+ dq.offer(data);
+ assertEquals(new String(dq.peek(),UTF8), testData);
+ assertEquals(new String(dq.take(),UTF8), testData);
+ assertTrue(dq.poll() == null);
+ QueueEvent qe = dq.offer(data, timeoutMs);
+ assertNotNull(qe);
+ assertEquals(new String(dq.remove(),UTF8), testData);
+
+ // should block until the background thread makes the offer
+ (new QueueChangerThread(dq, 1000)).start();
+ qe = dq.peek(true);
+ assertNotNull(qe);
+ dq.remove();
+
+ // timeout scenario ... background thread won't offer until long after the peek times out
+ QueueChangerThread qct = new QueueChangerThread(dq, 1000);
+ qct.start();
+ qe = dq.peek(500);
+ assertTrue(qe == null);
+
+ try {
+ qct.interrupt();
+ } catch (Exception exc) {}
+ }
+
+ private class QueueChangerThread extends Thread {
+
+ DistributedQueue dq;
+ long waitBeforeOfferMs;
+
+ QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
+ this.dq = dq;
+ this.waitBeforeOfferMs = waitBeforeOfferMs;
+ }
+
+ public void run() {
+ try {
+ Thread.sleep(waitBeforeOfferMs);
+ dq.offer(getName().getBytes(UTF8));
+ } catch (InterruptedException ie) {
+ // do nothing
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+ }
+
+ protected String setupDistributedQueueZNode(String znodePath) throws Exception {
+ if (!zkClient.exists("/", true))
+ zkClient.makePath("/", false, true);
+ if (zkClient.exists(znodePath, true))
+ zkClient.clean(znodePath);
+ zkClient.makePath(znodePath, false, true);
+ return znodePath;
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } catch (Exception exc) {}
+ closeZk();
+ }
+
+ protected void setupZk() throws Exception {
+ System.setProperty("zkClientTimeout", "8000");
+ zkServer = new ZkTestServer(createTempDir("zkData").toFile().getAbsolutePath());
+ zkServer.run();
+ System.setProperty("zkHost", zkServer.getZkAddress());
+ zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
+ assertTrue(zkClient.isConnected());
+ }
+
+ protected void closeZk() throws Exception {
+ if (zkClient != null)
+ zkClient.close();
+ zkServer.shutdown();
+ }
+}