You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/10/29 15:18:58 UTC
svn commit: r1635142 - in /lucene/dev/branches/branch_5x/solr: CHANGES.txt
core/src/java/org/apache/solr/cloud/DistributedQueue.java
core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
Author: thelabdude
Date: Wed Oct 29 14:18:57 2014
New Revision: 1635142
URL: http://svn.apache.org/r1635142
Log:
SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
Added:
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (with props)
Modified:
lucene/dev/branches/branch_5x/solr/CHANGES.txt
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1635142&r1=1635141&r2=1635142&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Wed Oct 29 14:18:57 2014
@@ -251,6 +251,9 @@ Bug Fixes
* SOLR-6591: Overseer can use stale cluster state and lose updates for collections
with stateFormat > 1. (shalin)
+* SOLR-6631: DistributedQueue spinning on calling zookeeper getChildren()
+ (Jessica Cheng Mallet, Mark Miller, Timothy Potter)
+
Optimizations
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1635142&r1=1635141&r2=1635142&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Wed Oct 29 14:18:57 2014
@@ -246,11 +246,14 @@ public class DistributedQueue {
@Override
public void process(WatchedEvent event) {
+ Event.EventType eventType = event.getType();
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
- + event.getState() + " type " + event.getType());
- synchronized (lock) {
- this.event = event;
- lock.notifyAll();
+ + event.getState() + " type " + eventType);
+ if (eventType == Event.EventType.NodeChildrenChanged) {
+ synchronized (lock) {
+ this.event = event;
+ lock.notifyAll();
+ }
}
}
Added: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java?rev=1635142&view=auto
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (added)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java Wed Oct 29 14:18:57 2014
@@ -0,0 +1,131 @@
+package org.apache.solr.cloud;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DistributedQueueTest extends SolrTestCaseJ4 {
+
+ protected ZkTestServer zkServer;
+ protected SolrZkClient zkClient;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ setupZk();
+ }
+
+ @Test
+ public void testDistributedQueue() throws Exception {
+ String dqZNode = "/distqueue/test";
+ String testData = "hello world";
+ long timeoutMs = 500L;
+
+ DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
+
+ // basic ops
+ assertTrue(dq.poll() == null);
+ byte[] data = testData.getBytes("UTF-8");
+ dq.offer(data);
+ assertEquals(new String(dq.peek(),"UTF-8"), testData);
+ assertEquals(new String(dq.take(),"UTF-8"), testData);
+ assertTrue(dq.poll() == null);
+ QueueEvent qe = dq.offer(data, timeoutMs);
+ assertNotNull(qe);
+ assertEquals(new String(dq.remove(),"UTF-8"), testData);
+
+ // should block until the background thread makes the offer
+ (new QueueChangerThread(dq, 1000)).start();
+ qe = dq.peek(true);
+ assertNotNull(qe);
+ dq.remove();
+
+ // timeout scenario ... background thread won't offer until long after the peek times out
+ QueueChangerThread qct = new QueueChangerThread(dq, 1000);
+ qct.start();
+ qe = dq.peek(500);
+ assertTrue(qe == null);
+
+ try {
+ qct.interrupt();
+ } catch (Exception exc) {}
+ }
+
+ private class QueueChangerThread extends Thread {
+
+ DistributedQueue dq;
+ long waitBeforeOfferMs;
+
+ QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
+ this.dq = dq;
+ this.waitBeforeOfferMs = waitBeforeOfferMs;
+ }
+
+ public void run() {
+ try {
+ Thread.sleep(waitBeforeOfferMs);
+ dq.offer(getName().getBytes("UTF-8"));
+ } catch (InterruptedException ie) {
+ // do nothing
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+ }
+
+ protected String setupDistributedQueueZNode(String znodePath) throws Exception {
+ if (!zkClient.exists("/", true))
+ zkClient.makePath("/", false, true);
+ if (zkClient.exists(znodePath, true))
+ zkClient.clean(znodePath);
+ zkClient.makePath(znodePath, false, true);
+ return znodePath;
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ super.tearDown();
+ } catch (Exception exc) {}
+ closeZk();
+ }
+
+ protected void setupZk() throws Exception {
+ System.setProperty("zkClientTimeout", "8000");
+ zkServer = new ZkTestServer(createTempDir("zkData").toFile().getAbsolutePath());
+ zkServer.run();
+ System.setProperty("zkHost", zkServer.getZkAddress());
+ zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
+ assertTrue(zkClient.isConnected());
+ }
+
+ protected void closeZk() throws Exception {
+ if (zkClient != null)
+ zkClient.close();
+ zkServer.shutdown();
+ }
+}