You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/16 19:01:09 UTC

[lucene-solr] 02/03: @1234 Cut unused dist queue stuff.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d5db80ddf071bbe81c2d31a422f48e4319697e7f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 16 12:12:17 2020 -0600

    @1234 Cut unused dist queue stuff.
---
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 297 +--------------------
 .../apache/solr/cloud/DistributedQueueTest.java    |  22 +-
 2 files changed, 18 insertions(+), 301 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index dba4487..70a61e5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -17,8 +17,6 @@
 package org.apache.solr.cloud;
 
 import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.solr.client.solrj.cloud.DistributedQueue;
 import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -27,7 +25,6 @@ import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
-import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -44,7 +41,6 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -143,12 +139,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public byte[] peek() throws KeeperException, InterruptedException {
-    Timer.Context time = stats.time(dir + "_peek");
-    try {
-      return firstElement();
-    } finally {
-      time.stop();
-    }
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -160,7 +151,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public byte[] peek(boolean block) throws KeeperException, InterruptedException {
-    return block ? peek(Long.MAX_VALUE) : peek();
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -172,28 +163,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public byte[] peek(long wait) throws KeeperException, InterruptedException {
-    Preconditions.checkArgument(wait > 0);
-    Timer.Context time;
-    if (wait == Long.MAX_VALUE) {
-      time = stats.time(dir + "_peek_wait_forever");
-    } else {
-      time = stats.time(dir + "_peek_wait" + wait);
-    }
-    updateLock.lockInterruptibly();
-    try {
-      long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
-      while (waitNanos > 0) {
-        byte[] result = firstElement();
-        if (result != null) {
-          return result;
-        }
-        waitNanos = changed.awaitNanos(waitNanos);
-      }
-      return null;
-    } finally {
-      updateLock.unlock();
-      time.stop();
-    }
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -204,12 +174,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public byte[] poll() throws KeeperException, InterruptedException {
-    Timer.Context time = stats.time(dir + "_poll");
-    try {
-      return removeFirst();
-    } finally {
-      time.stop();
-    }
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -307,21 +272,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public byte[] take() throws KeeperException, InterruptedException {
-    // Same as for element. Should refactor this.
-    Timer.Context timer = stats.time(dir + "_take");
-    updateLock.lockInterruptibly();
-    try {
-      while (true) {
-        byte[] result = removeFirst();
-        if (result != null) {
-          return result;
-        }
-        changed.await();
-      }
-    } finally {
-      updateLock.unlock();
-      timer.stop();
-    }
+    throw new UnsupportedOperationException();
   }
 
   private static Set<String> OPERATIONS = new HashSet<>();
@@ -338,48 +289,9 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public void offer(byte[] data) throws KeeperException, InterruptedException {
-    // TODO change to accept json
-    Map json = (Map) Utils.fromJSON(data);
-
-    Timer.Context time = stats.time(dir + "_offer");
-
-    try {
-      while (true) {
-        try {
-          if (maxQueueSize > 0) {
-            if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) { // nocommit review
-              // If a max queue size is set, check it before creating a new queue item.
-              Stat stat = zookeeper.exists(dir, null, true);
-              if (stat == null) {
-                // jump to the code below, which tries to create dir if it doesn't exist
-                throw new KeeperException.NoNodeException();
-              }
-              int remainingCapacity = maxQueueSize - stat.getNumChildren();
-              if (remainingCapacity <= 0) {
-                throw new IllegalStateException("queue is full");
-              }
-
-              // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
-              offerPermits.set(remainingCapacity / 100);
-            }
-          }
-
-          // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
-          // This will get set again when the watcher actually fires, but that's ok.
-          zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
-          isDirty = true;
-          return;
-        } catch (KeeperException.NoNodeException e) {
-          try {
-            zookeeper.create(dir, DATA, CreateMode.PERSISTENT, true);
-          } catch (KeeperException.NodeExistsException ne) {
-            // someone created it
-          }
-        }
-      }
-    } finally {
-      time.stop();
-    }
+    // TODO - if too many items on the queue, just block
+    zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+    return;
   }
 
   public Stats getZkStats() {
@@ -418,68 +330,14 @@ public class ZkDistributedQueue implements DistributedQueue {
    * list is inherently stale.
    */
   private String firstChild(boolean remove, boolean refetchIfDirty) throws KeeperException, InterruptedException {
-    updateLock.lockInterruptibly();
-    try {
-     if (log.isDebugEnabled()) log.debug("firstChild isDirty={}, refetchIfDirty={}, remove={}", isDirty, refetchIfDirty, remove);
-      // We always return from cache first, the cache will be cleared if the node is not exist
-      if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
-        if (log.isDebugEnabled()) log.debug("returning first child");
-        return remove ? knownChildren.pollFirst() : knownChildren.first();
-      }
-
-      if (!isDirty && knownChildren.isEmpty()) {
-        if (log.isDebugEnabled()) log.debug("isDirty={} knownChildren is empty, returning null", isDirty);
-        return null;
-      }
-
-      // Dirty, try to fetch an updated list of children from ZK.
-      // Only set a new watcher if there isn't already a watcher.
-
-      if (log.isDebugEnabled()) log.debug("isDirty={} fetch known children", isDirty);
-      ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
-      knownChildren = fetchZkChildren(newWatcher);
-      if (newWatcher != null) {
-        watcherCount++; // watcher was successfully set
-      }
-      isDirty = false;
-      if (knownChildren.isEmpty()) {
-        if (log.isDebugEnabled()) log.debug("known children is empty return null");
-        return null;
-      }
-      if (log.isDebugEnabled()) log.debug("signal all and get first, remove={}", remove);
-      changed.signalAll();
-      return remove ? knownChildren.pollFirst() : knownChildren.first();
-    } finally {
-      updateLock.unlock();
-    }
+    throw new UnsupportedOperationException();
   }
 
   /**
    * Return the current set of children from ZK; does not change internal state.
    */
   TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
-    while (true) {
-      try {
-        TreeSet<String> orderedChildren = new TreeSet<>();
-        if (log.isDebugEnabled()) log.debug("fetchZkChildren {}", dir);
-        List<String> childNames = zookeeper.getChildren(dir, watcher, true);
-        if (log.isDebugEnabled()) log.debug("got {} nodes", childNames);
-        stats.setQueueLength(childNames.size());
-        for (String childName : childNames) {
-          // Check format
-          if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
-            log.debug("Found child node with improper name: {}", childName);
-            continue;
-          }
-          orderedChildren.add(childName);
-        }
-        if (log.isDebugEnabled()) log.debug("returning {}", orderedChildren);
-        return orderedChildren;
-      } catch (KeeperException.NoNodeException e) {
-        zookeeper.makePath(dir, false, true);
-        // go back to the loop and try again
-      }
-    }
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -490,98 +348,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    */
   @Override
   public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
-    List<String> foundChildren = new ArrayList<>();
-    long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
-    if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, waitMillis);
-    boolean first = true;
-    while (true) {
-      // Trigger a refresh, but only force it if this is not the first iteration.
-      if (log.isDebugEnabled()) log.debug("call first child {}", !first);
-      firstChild(false, first);
-
-      updateLock.lockInterruptibly();
-      try {
-        for (String child : knownChildren) {
-          if (acceptFilter.test(child)) {
-            foundChildren.add(child);
-          }
-        }
-        if (!foundChildren.isEmpty()) {
-          break;
-        }
-        if (waitNanos <= 0) {
-          break;
-        }
-
-        // If this is our first time through, force a refresh before waiting.
-        if (first) {
-          first = false;
-          continue;
-        }
-
-        waitNanos = changed.awaitNanos(waitNanos);
-      } finally {
-        updateLock.unlock();
-      }
-
-      if (!foundChildren.isEmpty()) {
-        break;
-      }
-    }
-
-    // Technically we could restart the method if we fail to actually obtain any valid children
-    // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
-    // much more sophisticated waitNanos tracking.
-    List<Pair<String, byte[]>> result = new ArrayList<>();
-    for (String child : foundChildren) {
-      if (result.size() >= max) {
-        break;
-      }
-      try {
-        byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
-        result.add(new Pair<>(child, data));
-      } catch (KeeperException.NoNodeException e) {
-        // Another client deleted the node first, remove the in-memory and continue.
-        updateLock.lockInterruptibly();
-        try {
-          knownChildren.remove(child);
-        } finally {
-          updateLock.unlock();
-        }
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Return the head of the queue without modifying the queue.
-   *
-   * @return the data at the head of the queue.
-   */
-  private byte[] firstElement() throws KeeperException, InterruptedException {
-    while (true) {
-      if (log.isDebugEnabled()) log.debug("calling firstElement");
-      String firstChild = firstChild(false, false);
-      if (firstChild == null) {
-        if (log.isDebugEnabled()) log.debug("return null");
-        return null;
-      }
-      try {
-        if (log.isDebugEnabled()) log.debug("get data {}", dir + "/" + firstChild);
-        return zookeeper.getData(dir + "/" + firstChild, null, null, true);
-      } catch (KeeperException.NoNodeException e) {
-        if (log.isDebugEnabled()) log.debug("no node found {}, clear known children and set dirty", dir + "/" + firstChild);
-        // Another client deleted the node first, remove the in-memory and retry.
-        updateLock.lockInterruptibly();
-        try {
-          // Efficient only for single-consumer
-          knownChildren.clear();
-          isDirty = true;
-        } finally {
-          updateLock.unlock();
-        }
-      }
-    }
+    throw new UnsupportedOperationException();
   }
 
   private byte[] removeFirst() throws KeeperException, InterruptedException {
@@ -609,46 +376,4 @@ public class ZkDistributedQueue implements DistributedQueue {
       }
     }
   }
-
-  @VisibleForTesting int watcherCount() throws InterruptedException {
-    updateLock.lockInterruptibly();
-    try {
-      return watcherCount;
-    } finally {
-      updateLock.unlock();
-    }
-  }
-
-  @VisibleForTesting boolean isDirty() throws InterruptedException {
-    updateLock.lockInterruptibly();
-    try {
-
-      if (log.isDebugEnabled()) log.debug("isDirty {}", isDirty);
-      return isDirty;
-    } finally {
-      updateLock.unlock();
-    }
-  }
-
-  @VisibleForTesting class ChildWatcher implements Watcher {
-
-    @Override
-    public void process(WatchedEvent event) {
-
-      if (log.isDebugEnabled()) log.debug("ChildWatcher fired {} {}", event.getType(), event.getPath());
-      // session events are not change events, and do not remove the watcher; except for Expired
-      if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) {
-        return;
-      }
-      updateLock.lock();
-      try {
-        isDirty = true;
-        watcherCount--;
-        // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
-        changed.signalAll();
-      } finally {
-        updateLock.unlock();
-      }
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 449ff6b..470a10f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -143,12 +143,9 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     assertNull(dq.peek(100));
     
 
-   // assertFalse(dq.isDirty());
-    assertEquals(1, dq.watcherCount());
 
     forceSessionExpire();
 
-    assertEquals(0, dq.watcherCount());
 
     // Rerun the earlier test make sure updates are still seen, post reconnection.
     future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
@@ -171,28 +168,23 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
     String dqZNode = "/distqueue/test";
     ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
     assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
-    assertEquals(1, dq.watcherCount());
+
     assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
-    assertEquals(1, dq.watcherCount());
+
     assertNull(dq.peek());
-    assertEquals(1, dq.watcherCount());
+
     assertNull(dq.peek(1));
-    assertEquals(1, dq.watcherCount());
+
 
     dq.offer("hello world".getBytes(UTF8));
     assertNotNull(dq.peek()); // synchronously available
     // dirty and watcher state indeterminate here, race with watcher
     Thread.sleep(100); // watcher should have fired now
     assertNotNull(dq.peek());
-    // in case of race condition, childWatcher is kicked off after peek()
-    if (dq.watcherCount() == 0) {
-      dq.poll();
-      dq.offer("hello world".getBytes(UTF8));
-      dq.peek();
-    }
-    assertEquals(1, dq.watcherCount());
+
+
     assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
-    assertEquals(1, dq.watcherCount());
+
   }
 
   @Test