You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/16 19:01:09 UTC
[lucene-solr] 02/03: @1234 Cut unused dist queue stuff.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit d5db80ddf071bbe81c2d31a422f48e4319697e7f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 16 12:12:17 2020 -0600
@1234 Cut unused dist queue stuff.
---
.../org/apache/solr/cloud/ZkDistributedQueue.java | 297 +--------------------
.../apache/solr/cloud/DistributedQueueTest.java | 22 +-
2 files changed, 18 insertions(+), 301 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index dba4487..70a61e5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -17,8 +17,6 @@
package org.apache.solr.cloud;
import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -27,7 +25,6 @@ import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -44,7 +41,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -143,12 +139,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] peek() throws KeeperException, InterruptedException {
- Timer.Context time = stats.time(dir + "_peek");
- try {
- return firstElement();
- } finally {
- time.stop();
- }
+ throw new UnsupportedOperationException();
}
/**
@@ -160,7 +151,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] peek(boolean block) throws KeeperException, InterruptedException {
- return block ? peek(Long.MAX_VALUE) : peek();
+ throw new UnsupportedOperationException();
}
/**
@@ -172,28 +163,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] peek(long wait) throws KeeperException, InterruptedException {
- Preconditions.checkArgument(wait > 0);
- Timer.Context time;
- if (wait == Long.MAX_VALUE) {
- time = stats.time(dir + "_peek_wait_forever");
- } else {
- time = stats.time(dir + "_peek_wait" + wait);
- }
- updateLock.lockInterruptibly();
- try {
- long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
- while (waitNanos > 0) {
- byte[] result = firstElement();
- if (result != null) {
- return result;
- }
- waitNanos = changed.awaitNanos(waitNanos);
- }
- return null;
- } finally {
- updateLock.unlock();
- time.stop();
- }
+ throw new UnsupportedOperationException();
}
/**
@@ -204,12 +174,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] poll() throws KeeperException, InterruptedException {
- Timer.Context time = stats.time(dir + "_poll");
- try {
- return removeFirst();
- } finally {
- time.stop();
- }
+ throw new UnsupportedOperationException();
}
/**
@@ -307,21 +272,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] take() throws KeeperException, InterruptedException {
- // Same as for element. Should refactor this.
- Timer.Context timer = stats.time(dir + "_take");
- updateLock.lockInterruptibly();
- try {
- while (true) {
- byte[] result = removeFirst();
- if (result != null) {
- return result;
- }
- changed.await();
- }
- } finally {
- updateLock.unlock();
- timer.stop();
- }
+ throw new UnsupportedOperationException();
}
private static Set<String> OPERATIONS = new HashSet<>();
@@ -338,48 +289,9 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public void offer(byte[] data) throws KeeperException, InterruptedException {
- // TODO change to accept json
- Map json = (Map) Utils.fromJSON(data);
-
- Timer.Context time = stats.time(dir + "_offer");
-
- try {
- while (true) {
- try {
- if (maxQueueSize > 0) {
- if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) { // nocommit review
- // If a max queue size is set, check it before creating a new queue item.
- Stat stat = zookeeper.exists(dir, null, true);
- if (stat == null) {
- // jump to the code below, which tries to create dir if it doesn't exist
- throw new KeeperException.NoNodeException();
- }
- int remainingCapacity = maxQueueSize - stat.getNumChildren();
- if (remainingCapacity <= 0) {
- throw new IllegalStateException("queue is full");
- }
-
- // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
- offerPermits.set(remainingCapacity / 100);
- }
- }
-
- // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
- // This will get set again when the watcher actually fires, but that's ok.
- zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
- isDirty = true;
- return;
- } catch (KeeperException.NoNodeException e) {
- try {
- zookeeper.create(dir, DATA, CreateMode.PERSISTENT, true);
- } catch (KeeperException.NodeExistsException ne) {
- // someone created it
- }
- }
- }
- } finally {
- time.stop();
- }
+ // TODO - if too many items on the queue, just block
+ zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+ return;
}
public Stats getZkStats() {
@@ -418,68 +330,14 @@ public class ZkDistributedQueue implements DistributedQueue {
* list is inherently stale.
*/
private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
- updateLock.lockInterruptibly();
- try {
- if (log.isDebugEnabled()) log.debug("firstChild isDirty={}, refetchIfDirty={}, remove={}", isDirty, refetchIfDirty, remove);
- // We always return from cache first, the cache will be cleared if the node is not exist
- if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
- if (log.isDebugEnabled()) log.debug("returning first child");
- return remove ? knownChildren.pollFirst() : knownChildren.first();
- }
-
- if (!isDirty && knownChildren.isEmpty()) {
- if (log.isDebugEnabled()) log.debug("isDirty={} knownChildren is empty, returning null", isDirty);
- return null;
- }
-
- // Dirty, try to fetch an updated list of children from ZK.
- // Only set a new watcher if there isn't already a watcher.
-
- if (log.isDebugEnabled()) log.debug("isDirty={} fetch known children", isDirty);
- ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
- knownChildren = fetchZkChildren(newWatcher);
- if (newWatcher != null) {
- watcherCount++; // watcher was successfully set
- }
- isDirty = false;
- if (knownChildren.isEmpty()) {
- if (log.isDebugEnabled()) log.debug("known children is empty return null");
- return null;
- }
- if (log.isDebugEnabled()) log.debug("signal all and get first, remove={}", remove);
- changed.signalAll();
- return remove ? knownChildren.pollFirst() : knownChildren.first();
- } finally {
- updateLock.unlock();
- }
+ throw new UnsupportedOperationException();
}
/**
* Return the current set of children from ZK; does not change internal state.
*/
TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
- while (true) {
- try {
- TreeSet<String> orderedChildren = new TreeSet<>();
- if (log.isDebugEnabled()) log.debug("fetchZkChildren {}", dir);
- List<String> childNames = zookeeper.getChildren(dir, watcher, true);
- if (log.isDebugEnabled()) log.debug("got {} nodes", childNames);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- // Check format
- if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
- log.debug("Found child node with improper name: {}", childName);
- continue;
- }
- orderedChildren.add(childName);
- }
- if (log.isDebugEnabled()) log.debug("returning {}", orderedChildren);
- return orderedChildren;
- } catch (KeeperException.NoNodeException e) {
- zookeeper.makePath(dir, false, true);
- // go back to the loop and try again
- }
- }
+ throw new UnsupportedOperationException();
}
/**
@@ -490,98 +348,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
- List<String> foundChildren = new ArrayList<>();
- long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
- if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, waitMillis);
- boolean first = true;
- while (true) {
- // Trigger a refresh, but only force it if this is not the first iteration.
- if (log.isDebugEnabled()) log.debug("call first child {}", !first);
- firstChild(false, first);
-
- updateLock.lockInterruptibly();
- try {
- for (String child : knownChildren) {
- if (acceptFilter.test(child)) {
- foundChildren.add(child);
- }
- }
- if (!foundChildren.isEmpty()) {
- break;
- }
- if (waitNanos <= 0) {
- break;
- }
-
- // If this is our first time through, force a refresh before waiting.
- if (first) {
- first = false;
- continue;
- }
-
- waitNanos = changed.awaitNanos(waitNanos);
- } finally {
- updateLock.unlock();
- }
-
- if (!foundChildren.isEmpty()) {
- break;
- }
- }
-
- // Technically we could restart the method if we fail to actually obtain any valid children
- // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
- // much more sophisticated waitNanos tracking.
- List<Pair<String, byte[]>> result = new ArrayList<>();
- for (String child : foundChildren) {
- if (result.size() >= max) {
- break;
- }
- try {
- byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
- result.add(new Pair<>(child, data));
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first, remove the in-memory and continue.
- updateLock.lockInterruptibly();
- try {
- knownChildren.remove(child);
- } finally {
- updateLock.unlock();
- }
- }
- }
- return result;
- }
-
- /**
- * Return the head of the queue without modifying the queue.
- *
- * @return the data at the head of the queue.
- */
- private byte[] firstElement() throws KeeperException, InterruptedException {
- while (true) {
- if (log.isDebugEnabled()) log.debug("calling firstElement");
- String firstChild = firstChild(false, false);
- if (firstChild == null) {
- if (log.isDebugEnabled()) log.debug("return null");
- return null;
- }
- try {
- if (log.isDebugEnabled()) log.debug("get data {}", dir + "/" + firstChild);
- return zookeeper.getData(dir + "/" + firstChild, null, null, true);
- } catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("no node found {}, clear known children and set dirty", dir + "/" + firstChild);
- // Another client deleted the node first, remove the in-memory and retry.
- updateLock.lockInterruptibly();
- try {
- // Efficient only for single-consumer
- knownChildren.clear();
- isDirty = true;
- } finally {
- updateLock.unlock();
- }
- }
- }
+ throw new UnsupportedOperationException();
}
private byte[] removeFirst() throws KeeperException, InterruptedException {
@@ -609,46 +376,4 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
}
-
- @VisibleForTesting int watcherCount() throws InterruptedException {
- updateLock.lockInterruptibly();
- try {
- return watcherCount;
- } finally {
- updateLock.unlock();
- }
- }
-
- @VisibleForTesting boolean isDirty() throws InterruptedException {
- updateLock.lockInterruptibly();
- try {
-
- if (log.isDebugEnabled()) log.debug("isDirty {}", isDirty);
- return isDirty;
- } finally {
- updateLock.unlock();
- }
- }
-
- @VisibleForTesting class ChildWatcher implements Watcher {
-
- @Override
- public void process(WatchedEvent event) {
-
- if (log.isDebugEnabled()) log.debug("ChildWatcher fired {} {}", event.getType(), event.getPath());
- // session events are not change events, and do not remove the watcher; except for Expired
- if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) {
- return;
- }
- updateLock.lock();
- try {
- isDirty = true;
- watcherCount--;
- // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
- changed.signalAll();
- } finally {
- updateLock.unlock();
- }
- }
- }
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 449ff6b..470a10f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -143,12 +143,9 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertNull(dq.peek(100));
- // assertFalse(dq.isDirty());
- assertEquals(1, dq.watcherCount());
forceSessionExpire();
- assertEquals(0, dq.watcherCount());
// Rerun the earlier test make sure updates are still seen, post reconnection.
future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
@@ -171,28 +168,23 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
String dqZNode = "/distqueue/test";
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
- assertEquals(1, dq.watcherCount());
+
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
- assertEquals(1, dq.watcherCount());
+
assertNull(dq.peek());
- assertEquals(1, dq.watcherCount());
+
assertNull(dq.peek(1));
- assertEquals(1, dq.watcherCount());
+
dq.offer("hello world".getBytes(UTF8));
assertNotNull(dq.peek()); // synchronously available
// dirty and watcher state indeterminate here, race with watcher
Thread.sleep(100); // watcher should have fired now
assertNotNull(dq.peek());
- // in case of race condition, childWatcher is kicked off after peek()
- if (dq.watcherCount() == 0) {
- dq.poll();
- dq.offer("hello world".getBytes(UTF8));
- dq.peek();
- }
- assertEquals(1, dq.watcherCount());
+
+
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
- assertEquals(1, dq.watcherCount());
+
}
@Test